Skip to content

Commit

Permalink
qe: sync @default(now()) and @updatedAt within a request
Browse files Browse the repository at this point in the history
There are two points of difficulty here:

- Using the same PrismaValue::DateTime as the value of `now()` and
  `@updatedAt` for the duration of a request. We achieve that through a
  task local.
  Alternatives: passing the context through explicitly. I tried, it's a
  massive refactoring.
- The loss of precision due to the `From<PrismaValue>` impl for
  `QueryValue`. This is fixed by adding a `QueryValue::DateTime` variant
  that preserves the original precision.
  • Loading branch information
tomhoule committed Sep 19, 2022
1 parent fb56bfe commit 46e1d7b
Show file tree
Hide file tree
Showing 17 changed files with 207 additions and 115 deletions.
2 changes: 1 addition & 1 deletion flake.nix
Expand Up @@ -34,7 +34,7 @@
"mobc-0.7.3" = "sha256-88jSFqOyMy2E7TP1HtMcE4CQXoKhBpO8XuSFKGtfgqA=";
"mysql_async-0.30.0" = "sha256-I1Q9G3H3BW/Paq9aOYGcxQf4JVwN/ZNhGuHwTqbuxWc=";
"postgres-native-tls-0.5.0" = "sha256-kwqHalfwrvNQYUdAqObTAab3oWzBLl6hab2JGXVyJ3k=";
"quaint-0.2.0-alpha.13" = "sha256-8vaYhXSFTibNLt+yTj+RQepzJ8CzWthhgQXVtSl+DhI=";
"quaint-0.2.0-alpha.13" = "sha256-kCbxrImKb5VqSjC+W/qZpS7sHSbepayZMId+TpReU5k=";
"tokio-native-tls-0.3.0" = "sha256-ayH3TJ1iUQeZicR2nrsuxLykMoPL1fYBqRb21ValR5Q=";
};
};
Expand Down
@@ -1,6 +1,7 @@
mod max_integer;
mod prisma_10098;
mod prisma_10935;
mod prisma_12572;
mod prisma_12929;
mod prisma_13097;
mod prisma_14001;
Expand Down
@@ -0,0 +1,59 @@
use query_engine_tests::*;

#[test_suite(schema(schema))]
mod prisma_12572 {
fn schema() -> String {
r#"
model Test1 {
id String @id
up1 DateTime @updatedAt
cr1 DateTime @default(now())
cr2 DateTime @default(now())
up2 DateTime @updatedAt
test2s Test2[]
}
model Test2 {
id String @id
test1Id String @unique
test1 Test1 @relation(fields: [test1Id], references: [id])
cr DateTime @default(now())
up DateTime @updatedAt
}
"#
.to_owned()
}

#[connector_test]
async fn all_generated_timestamps_are_the_same(runner: Runner) -> TestResult<()> {
runner
.query(r#"mutation { createOneTest1(data: {id:"one", test2s: { create: {id: "two"}}}) { id }}"#)
.await?
.assert_success();
let testones = runner.query(r#"{ findManyTest1 { id up1 cr1 cr2 up2 } }"#).await?;
let testtwos = runner.query(r#"{ findManyTest2 { id up cr } }"#).await?;
testones.assert_success();
testtwos.assert_success();

let testones_json = testones.to_json_value();
let testtwos_json = testtwos.to_json_value();
let testone_obj = &testones_json["data"]["findManyTest1"][0];
let testtwo_obj = &testtwos_json["data"]["findManyTest2"][0];

let values = &[
&testone_obj["up1"].as_str().unwrap(),
&testone_obj["up2"].as_str().unwrap(),
&testone_obj["cr1"].as_str().unwrap(),
&testone_obj["cr2"].as_str().unwrap(),
&testtwo_obj["up"].as_str().unwrap(),
&testtwo_obj["cr"].as_str().unwrap(),
];

// assert that all the datetimes are the same
for datetimes in values.windows(2) {
assert_eq!(datetimes[0], datetimes[1]);
}

Ok(())
}
}
35 changes: 14 additions & 21 deletions query-engine/connectors/query-connector/src/write_args.rs
Expand Up @@ -2,16 +2,16 @@ use crate::{
error::{ConnectorError, ErrorKind},
Filter,
};
use chrono::Utc;
use indexmap::{map::Keys, IndexMap};
use prisma_models::{
CompositeFieldRef, Field, ModelProjection, ModelRef, PrismaValue, ScalarFieldRef, SelectedField, SelectionResult,
};
use std::{borrow::Borrow, convert::TryInto, ops::Deref};

/// WriteArgs represent data to be written to an underlying data source.
#[derive(Debug, PartialEq, Clone, Default)]
#[derive(Debug, PartialEq, Clone)]
pub struct WriteArgs {
pub request_now: PrismaValue,
pub args: IndexMap<DatasourceFieldName, WriteOperation>,
}

Expand Down Expand Up @@ -352,24 +352,17 @@ impl TryInto<PrismaValue> for WriteOperation {
}
}

impl From<IndexMap<DatasourceFieldName, WriteOperation>> for WriteArgs {
fn from(args: IndexMap<DatasourceFieldName, WriteOperation>) -> Self {
Self { args }
impl WriteArgs {
pub fn new(args: IndexMap<DatasourceFieldName, WriteOperation>, request_now: PrismaValue) -> Self {
Self { args, request_now }
}
}

impl From<Vec<(DatasourceFieldName, WriteOperation)>> for WriteArgs {
fn from(pairs: Vec<(DatasourceFieldName, WriteOperation)>) -> Self {
pub fn new_empty(request_now: PrismaValue) -> Self {
Self {
args: pairs.into_iter().collect(),
args: Default::default(),
request_now,
}
}
}

impl WriteArgs {
pub fn new() -> Self {
Self { args: IndexMap::new() }
}

pub fn insert<T, V>(&mut self, key: T, arg: V)
where
Expand Down Expand Up @@ -404,19 +397,19 @@ impl WriteArgs {
}

pub fn add_datetimes(&mut self, model: &ModelRef) {
let now = PrismaValue::DateTime(Utc::now().into());
let updated_at_fields = model.fields().updated_at();
let value = &self.request_now;

for f in updated_at_fields {
if self.args.get(f.db_name()).is_none() {
self.args.insert(f.into(), WriteOperation::scalar_set(now.clone()));
self.args.insert(f.into(), WriteOperation::scalar_set(value.clone()));
}
}
}

pub fn update_datetimes(&mut self, model: ModelRef) {
pub fn update_datetimes(&mut self, model: &ModelRef) {
if !self.args.is_empty() {
self.add_datetimes(&model)
self.add_datetimes(model)
}
}

Expand Down Expand Up @@ -450,7 +443,7 @@ impl WriteArgs {
/// Picks all arguments out of `args` that are updating a value for a field
/// contained in `projection`, as those need to be merged into the records later on.
pub fn pick_args(projection: &ModelProjection, args: &WriteArgs) -> WriteArgs {
let pairs: Vec<_> = projection
let pairs = projection
.scalar_fields()
.into_iter()
.filter_map(|field| {
Expand All @@ -459,7 +452,7 @@ pub fn pick_args(projection: &ModelProjection, args: &WriteArgs) -> WriteArgs {
})
.collect();

WriteArgs::from(pairs)
WriteArgs::new(pairs, args.request_now.clone())
}

/// Merges the incoming write argument values into the given, already loaded, ids. Overwrites existing values.
Expand Down
Expand Up @@ -103,10 +103,6 @@ where
let col = sf.db_name().to_string();

let column = Column::from((full_table_name, col)).type_family(sf.type_family());

match sf.default_value.as_ref().and_then(|d| d.get()) {
Some(default) => column.default(sf.value(default)),
None => column.default(quaint::ast::DefaultValue::Generated),
}
column.default(quaint::ast::DefaultValue::Generated)
}
}
Expand Up @@ -9,7 +9,7 @@ use tracing::Span;

/// `INSERT` a new record to the database. Resulting an `INSERT` ast and an
/// optional `RecordProjection` if available from the arguments or model.
pub fn create_record(model: &ModelRef, mut args: WriteArgs, trace_id: Option<String>) -> Insert<'static> {
pub(crate) fn create_record(model: &ModelRef, mut args: WriteArgs, trace_id: Option<String>) -> Insert<'static> {
let fields: Vec<_> = model
.fields()
.scalar()
Expand Down
130 changes: 71 additions & 59 deletions query-engine/core/src/executor/interpreting_executor.rs
Expand Up @@ -51,13 +51,16 @@ where
if let Some(tx_id) = tx_id {
self.itx_manager.execute(&tx_id, operation, trace_id).await
} else {
execute_single_self_contained(
&self.connector,
query_schema,
operation,
trace_id,
self.force_transactions,
)
super::with_request_now(async move {
execute_single_self_contained(
&self.connector,
query_schema,
operation,
trace_id,
self.force_transactions,
)
.await
})
.await
}
}
Expand All @@ -84,33 +87,39 @@ where
) -> crate::Result<Vec<crate::Result<ResponseData>>> {
if let Some(tx_id) = tx_id {
self.itx_manager.batch_execute(&tx_id, operations, trace_id).await
} else if transactional {
let connection_name = self.connector.name();
let conn_span = info_span!(
"prisma:engine:connection",
user_facing = true,
"db.type" = connection_name.as_str()
);
let mut conn = self.connector.get_connection().instrument(conn_span).await?;
let mut tx = conn.start_transaction(None).await?;

let results = execute_many_operations(query_schema, tx.as_connection_like(), &operations, trace_id).await;

if results.is_err() {
tx.rollback().await?;
} else {
tx.commit().await?;
}

results
} else {
execute_many_self_contained(
&self.connector,
query_schema,
&operations,
trace_id,
self.force_transactions,
)
super::with_request_now(async move {
if transactional {
let connection_name = self.connector.name();
let conn_span = info_span!(
"prisma:engine:connection",
user_facing = true,
"db.type" = connection_name.as_str()
);
let mut conn = self.connector.get_connection().instrument(conn_span).await?;
let mut tx = conn.start_transaction(None).await?;

let results =
execute_many_operations(query_schema, tx.as_connection_like(), &operations, trace_id).await;

if results.is_err() {
tx.rollback().await?;
} else {
tx.commit().await?;
}

results
} else {
execute_many_self_contained(
&self.connector,
query_schema,
&operations,
trace_id,
self.force_transactions,
)
.await
}
})
.await
}
}
Expand All @@ -132,35 +141,38 @@ where
valid_for_millis: u64,
isolation_level: Option<String>,
) -> crate::Result<TxId> {
let id = TxId::default();
trace!("[{}] Starting...", id);
let connection_name = self.connector.name();
let conn_span = info_span!(
"prisma:engine:connection",
user_facing = true,
"db.type" = connection_name.as_str()
);
let conn = time::timeout(
Duration::from_millis(max_acquisition_millis),
self.connector.get_connection(),
)
.instrument(conn_span)
.await;

let conn = conn.map_err(|_| TransactionError::AcquisitionTimeout)??;
let c_tx = OpenTx::start(conn, isolation_level).await?;

self.itx_manager
.create_tx(
query_schema.clone(),
id.clone(),
c_tx,
Duration::from_millis(valid_for_millis),
super::with_request_now(async move {
let id = TxId::default();
trace!("[{}] Starting...", id);
let connection_name = self.connector.name();
let conn_span = info_span!(
"prisma:engine:connection",
user_facing = true,
"db.type" = connection_name.as_str()
);
let conn = time::timeout(
Duration::from_millis(max_acquisition_millis),
self.connector.get_connection(),
)
.instrument(conn_span)
.await;

debug!("[{}] Started.", id);
Ok(id)
let conn = conn.map_err(|_| TransactionError::AcquisitionTimeout)??;
let c_tx = OpenTx::start(conn, isolation_level).await?;

self.itx_manager
.create_tx(
query_schema.clone(),
id.clone(),
c_tx,
Duration::from_millis(valid_for_millis),
)
.await;

debug!("[{}] Started.", id);
Ok(id)
})
.await
}

async fn commit_tx(&self, tx_id: TxId) -> crate::Result<()> {
Expand Down
25 changes: 24 additions & 1 deletion query-engine/core/src/executor/mod.rs
Expand Up @@ -16,7 +16,6 @@ pub use loader::*;
use crate::{query_document::Operation, response_ir::ResponseData, schema::QuerySchemaRef, TxId};
use async_trait::async_trait;
use connector::Connector;

use tracing::Dispatch;

#[async_trait]
Expand Down Expand Up @@ -88,3 +87,27 @@ pub trait TransactionManager {
pub fn get_current_dispatcher() -> Dispatch {
tracing::dispatcher::get_default(|current| current.clone())
}

tokio::task_local! {
static REQUEST_NOW: prisma_value::PrismaValue;
}

/// A timestamp that should be the `NOW()` value for the whole duration of a request. So all
/// `@default(now())` and `@updatedAt` should use it.
pub(crate) fn get_request_now() -> prisma_value::PrismaValue {
REQUEST_NOW.with(|rn| rn.clone())
}

pub(crate) async fn with_request_now<F, R>(fut: F) -> R
where
F: std::future::Future<Output = R>,
{
let is_set = REQUEST_NOW.try_with(|_| async {}).is_ok();

if is_set {
fut.await
} else {
let now = prisma_value::PrismaValue::DateTime(chrono::Utc::now().into());
REQUEST_NOW.scope(now, fut).await
}
}
4 changes: 2 additions & 2 deletions query-engine/core/src/interactive_transactions/actors.rs
Expand Up @@ -272,7 +272,7 @@ pub fn spawn_itx_actor(
span.record("itx_id", &tx_id_str.as_str());

tokio::task::spawn(
async move {
crate::executor::with_request_now(async move {
let sleep = time::sleep(timeout);
tokio::pin!(sleep);

Expand Down Expand Up @@ -300,7 +300,7 @@ pub fn spawn_itx_actor(
let _ = send_done.send(server.id.clone()).await;

trace!("[{}] has stopped with {}", server.id.to_string(), server.cached_tx);
}
})
.instrument(span)
.with_subscriber(dispatcher),
);
Expand Down

0 comments on commit 46e1d7b

Please sign in to comment.