Skip to content

Commit

Permalink
merge: #1262
Browse files Browse the repository at this point in the history
1262: feat(dal,sdf,si-data): introduce "cloneable" Postgres transactions r=fnichol a=fnichol

This change alters some of the internal details of our `pg` module in
`si-data` which is an adapter layer on top of the `tokio-postgres` and
`deadpool-postgres` crates.

The change is largely in the `InstrumentedTransaction` type which now
holds an `Arc<Mutex<Transaction<'a>>` (formerly this would have held a
`Transaction<'a>`) allowing the type to derive `Clone`. Now we have a
way of "cloning" an in-flight database transaction and pass ownership to
other functions, Tokio tasks, etc. The key here is that all copies are
sharing the same mutex-guarded `Transaction<'a>` type and so the
consumers of these copies are coordinating within the singular
transaction. As the synchronized access is all internal to
`InstrumentedTransaction`, the external API did not change.

*However*, there are 2 reasons leading to signature changes which
consequently resulted in a large number of small changes across the dal,
sdf, and si-data crates:

1. The signature of `DalContext` needs an additional explicit (but often
   anonymous) lifetime. Sorry! Initially when developing the
   `DalContext` type, the borrowed `Transactions` type used the *same*
   lifetime for the borrow and for the internal lifetime of the type.
   That is, prior to this change the field signature was `txns: &'t
   Transactions<'t>` which was fine because the internals of
   `Transactions` need to live at least as long as the borrow of
   `Transactions` itself. With this new change we occasionally need to
   describe the lifetime inside `Transactions` separately from its
   borrowed lifetime. That is, the field signature is now `txns: &'t
   Transactions<'a>` where, if necessary, the lifetimes `'a: 't` apply.
   This means that `'a` need to live *at least* as long as `'t` but
   could live longer. It is this tweak that allows our existing
   `txns.commit().await` to work as the `txns` is borrowed into our
   `DalContext`. Cool? Maybe not, but that's why! Speaking of
   `commit`...
2. The caller of a `pg_txn.commit().await` needs to ensure that there
   are no other copies of `pg_txn` that are still in scope. The prior
   implementation makes this possible as there was no way to clone
   transactions and importantly the underlying `tokio-postgres` crate's
   `Transaction.commit` signature consumes `self`. Our upgraded
   `InstrumentedTransaction` now wraps this transaction with
   `Arc<Mutex<...>>`, meaning we will need to also consume these data
   types in `commit` (note that this applies equally to the `rollback`
   method). To accomplish this in `commit` we are asserting that there
   are no more `Arc` copies and we only have 1 remaining (there is a way
   in Rust you can ask an `Arc` for it's "strong count" of references)
   and if this holds true we can safely unwrap the `Arc` and then
   consume the inner `Mutex` resulting in a fully consumable
   `Transaction` type. While this change sounds potentially troubling,
   in practice so far no code needed to be modified for the correct
   behavior of the codebase and test suites. It's my assertion that any
   error messages coming back on `commit` and/or `rollback` that are
   related to these assertion checks/unwraps are either a concurrent
   race calling `commit` just before other `Arc` copies are fully
   dropped, or it's a straight-up programmer error. See the
   documentation for `InstrumentedTransaction.commit` and `rollback` for
   more details. Unfortunately, these 2 new error conditions (one each
   for `commit` and `rollback`) meant introducing a proper error type
   which wrapped over `tokio_postgres::Error` and updating other parts
   of the codebase that we referring directly to the
   `tokio_postgres::Error` type. A nice side effect was finding that
   once this refactoring was done, there was no longer a reason for
   `lib/dal` to take a direct dependency on `tokio-postgres` which felt
   good and right.

<img src="https://media2.giphy.com/media/ToMjGpmGUC7axrjBAmQ/giphy.gif"/>

Signed-off-by: Fletcher Nichol <fletcher@systeminit.com>

Co-authored-by: Fletcher Nichol <fletcher@systeminit.com>
  • Loading branch information
si-bors-ng[bot] and fnichol committed Sep 6, 2022
2 parents fe92fc2 + 8f1c99b commit 580a72c
Show file tree
Hide file tree
Showing 149 changed files with 1,124 additions and 748 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion lib/dal/Cargo.toml
Expand Up @@ -28,7 +28,6 @@ telemetry = { path = "../../lib/telemetry-rs" }
tempfile = "3.2.0"
thiserror = "1.0.24"
tokio = { version = "1.2.0", features = ["full"] }
tokio-postgres = { version = "0.7.0", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] }
tokio-stream = "0.1.3"
url = "2.2.2"
uuid = { version = "1.0.0", features = ["v4"] }
Expand Down
20 changes: 10 additions & 10 deletions lib/dal/src/attribute/prototype.rs
Expand Up @@ -145,7 +145,7 @@ impl AttributePrototype {
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
pub async fn new(
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
func_id: FuncId,
func_binding_id: FuncBindingId,
func_binding_return_value_id: FuncBindingReturnValueId,
Expand Down Expand Up @@ -233,7 +233,7 @@ impl AttributePrototype {
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
pub async fn new_with_existing_value(
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
func_id: FuncId,
context: AttributeContext,
key: Option<String>,
Expand Down Expand Up @@ -293,7 +293,7 @@ impl AttributePrototype {
/// Caution: this should be used rather than [`StandardModel::delete()`] when deleting an
/// [`AttributePrototype`]. That method should never be called directly.
pub async fn remove(
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
attribute_prototype_id: &AttributePrototypeId,
) -> AttributePrototypeResult<()> {
// Get the prototype for the given id. Once we get its corresponding value, we can delete
Expand Down Expand Up @@ -367,7 +367,7 @@ impl AttributePrototype {

#[instrument(skip_all)]
pub async fn list_for_context(
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
context: AttributeContext,
) -> AttributePrototypeResult<Vec<Self>> {
let rows = ctx
Expand All @@ -389,7 +389,7 @@ impl AttributePrototype {

#[tracing::instrument(skip_all)]
pub async fn find_with_parent_value_and_key_for_context(
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
parent_attribute_value_id: Option<AttributeValueId>,
key: Option<String>,
context: AttributeContext,
Expand All @@ -414,7 +414,7 @@ impl AttributePrototype {

/// List [`Vec<Self>`] that depend on a provided [`InternalProviderId`](crate::InternalProvider).
pub async fn list_from_internal_provider_use(
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
internal_provider_id: InternalProviderId,
) -> AttributePrototypeResult<Vec<Self>> {
let rows = ctx
Expand All @@ -430,7 +430,7 @@ impl AttributePrototype {
/// List [`Vec<Self>`] that depend on a provided [`ExternalProviderId`](crate::ExternalProvider)
/// and _tail_ [`ComponentId`](crate::Component).
pub async fn list_by_head_from_external_provider_use_with_tail(
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
external_provider_id: ExternalProviderId,
tail_component_id: ComponentId,
) -> AttributePrototypeResult<Vec<AttributePrototypeGroupByHeadComponentId>> {
Expand Down Expand Up @@ -466,7 +466,7 @@ impl AttributePrototype {
/// and whose context contains the provided [`AttributeReadContext`](crate::AttributeReadContext)
/// or are "more-specific" than the provided [`AttributeReadContext`](crate::AttributeReadContext).
pub async fn attribute_values_in_context_or_greater(
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
attribute_prototype_id: AttributePrototypeId,
context: AttributeReadContext,
) -> AttributePrototypeResult<Vec<AttributeValue>> {
Expand All @@ -489,7 +489,7 @@ impl AttributePrototype {
#[allow(clippy::too_many_arguments)]
#[async_recursion]
async fn create_intermediate_proxy_values(
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
parent_attribute_value_id: Option<AttributeValueId>,
prototype_id: AttributePrototypeId,
context: AttributeContext,
Expand Down Expand Up @@ -554,7 +554,7 @@ impl AttributePrototype {

#[allow(clippy::too_many_arguments)]
pub async fn update_for_context(
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
attribute_prototype_id: AttributePrototypeId,
context: AttributeContext,
func_id: FuncId,
Expand Down
16 changes: 8 additions & 8 deletions lib/dal/src/attribute/prototype/argument.rs
Expand Up @@ -96,7 +96,7 @@ impl AttributePrototypeArgument {
#[instrument(skip_all)]
/// Create a new [`AttributePrototypeArgument`] for _intra_ [`Component`](crate::Component) use.
pub async fn new_for_intra_component(
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
attribute_prototype_id: AttributePrototypeId,
name: impl AsRef<str>,
internal_provider_id: InternalProviderId,
Expand Down Expand Up @@ -133,7 +133,7 @@ impl AttributePrototypeArgument {
/// Create a new [`AttributePrototypeArgument`] for _inter_ [`Component`](crate::Component) use.
#[instrument(skip_all)]
pub async fn new_for_inter_component(
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
attribute_prototype_id: AttributePrototypeId,
name: impl AsRef<str>,
head_component_id: ComponentId,
Expand Down Expand Up @@ -204,7 +204,7 @@ impl AttributePrototypeArgument {
/// cannot become unset and vice versa.
pub async fn set_internal_provider_id_safe(
mut self,
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
internal_provider_id: InternalProviderId,
) -> AttributePrototypeArgumentResult<()> {
if self.internal_provider_id != UNSET_ID_VALUE.into()
Expand All @@ -230,7 +230,7 @@ impl AttributePrototypeArgument {
/// cannot become unset and vice versa.
pub async fn set_external_provider_id_safe(
mut self,
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
external_provider_id: ExternalProviderId,
) -> AttributePrototypeArgumentResult<()> {
if self.external_provider_id != UNSET_ID_VALUE.into()
Expand All @@ -256,7 +256,7 @@ impl AttributePrototypeArgument {
/// cannot become unset and vice versa.
pub async fn set_tail_component_id_safe(
mut self,
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
tail_component_id: ComponentId,
) -> AttributePrototypeArgumentResult<()> {
if self.tail_component_id != UNSET_ID_VALUE.into()
Expand All @@ -281,7 +281,7 @@ impl AttributePrototypeArgument {
/// cannot become unset and vice versa.
pub async fn set_head_component_id_safe(
mut self,
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
head_component_id: ComponentId,
) -> AttributePrototypeArgumentResult<()> {
if self.head_component_id != UNSET_ID_VALUE.into()
Expand Down Expand Up @@ -312,7 +312,7 @@ impl AttributePrototypeArgument {
/// [`AttributePrototype`](crate::AttributePrototype).
#[tracing::instrument(skip(ctx))]
pub async fn list_for_attribute_prototype(
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
attribute_prototype_id: AttributePrototypeId,
) -> AttributePrototypeArgumentResult<Vec<Self>> {
let rows = ctx
Expand All @@ -336,7 +336,7 @@ impl AttributePrototypeArgument {
/// the same "name" sharing the same name.
#[tracing::instrument(skip(ctx))]
pub async fn list_by_name_for_attribute_prototype_and_head_component_id(
ctx: &DalContext<'_, '_>,
ctx: &DalContext<'_, '_, '_>,
attribute_prototype_id: AttributePrototypeId,
head_component_id: ComponentId,
) -> AttributePrototypeArgumentResult<Vec<AttributePrototypeArgumentGroup>> {
Expand Down

0 comments on commit 580a72c

Please sign in to comment.