Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

qe: sync @default(now()) and @updatedAt within a request #3200

Merged
merged 1 commit into from Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion flake.nix
Expand Up @@ -34,7 +34,7 @@
"mobc-0.7.3" = "sha256-88jSFqOyMy2E7TP1HtMcE4CQXoKhBpO8XuSFKGtfgqA=";
"mysql_async-0.30.0" = "sha256-I1Q9G3H3BW/Paq9aOYGcxQf4JVwN/ZNhGuHwTqbuxWc=";
"postgres-native-tls-0.5.0" = "sha256-kwqHalfwrvNQYUdAqObTAab3oWzBLl6hab2JGXVyJ3k=";
"quaint-0.2.0-alpha.13" = "sha256-8vaYhXSFTibNLt+yTj+RQepzJ8CzWthhgQXVtSl+DhI=";
"quaint-0.2.0-alpha.13" = "sha256-kCbxrImKb5VqSjC+W/qZpS7sHSbepayZMId+TpReU5k=";
"tokio-native-tls-0.3.0" = "sha256-ayH3TJ1iUQeZicR2nrsuxLykMoPL1fYBqRb21ValR5Q=";
};
};
Expand Down
@@ -1,6 +1,7 @@
mod max_integer;
mod prisma_10098;
mod prisma_10935;
mod prisma_12572;
mod prisma_12929;
mod prisma_13097;
mod prisma_14001;
Expand Down
@@ -0,0 +1,59 @@
use query_engine_tests::*;

#[test_suite(schema(schema))]
mod prisma_12572 {
fn schema() -> String {
r#"
model Test1 {
#id(id, String, @id)
up1 DateTime @updatedAt
cr1 DateTime @default(now())
cr2 DateTime @default(now())
up2 DateTime @updatedAt
test2s Test2[]
}

model Test2 {
#id(id, String, @id)
test1Id String @unique
test1 Test1 @relation(fields: [test1Id], references: [id])
cr DateTime @default(now())
up DateTime @updatedAt
}
"#
.to_owned()
}

#[connector_test]
async fn all_generated_timestamps_are_the_same(runner: Runner) -> TestResult<()> {
runner
.query(r#"mutation { createOneTest1(data: {id:"one", test2s: { create: {id: "two"}}}) { id }}"#)
.await?
.assert_success();
let testones = runner.query(r#"{ findManyTest1 { id up1 cr1 cr2 up2 } }"#).await?;
let testtwos = runner.query(r#"{ findManyTest2 { id up cr } }"#).await?;
testones.assert_success();
testtwos.assert_success();

let testones_json = testones.to_json_value();
let testtwos_json = testtwos.to_json_value();
let testone_obj = &testones_json["data"]["findManyTest1"][0];
let testtwo_obj = &testtwos_json["data"]["findManyTest2"][0];

let values = &[
&testone_obj["up1"].as_str().unwrap(),
&testone_obj["up2"].as_str().unwrap(),
&testone_obj["cr1"].as_str().unwrap(),
&testone_obj["cr2"].as_str().unwrap(),
&testtwo_obj["up"].as_str().unwrap(),
&testtwo_obj["cr"].as_str().unwrap(),
];

// assert that all the datetimes are the same
for datetimes in values.windows(2) {
tomhoule marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(datetimes[0], datetimes[1]);
}

Ok(())
}
}
35 changes: 14 additions & 21 deletions query-engine/connectors/query-connector/src/write_args.rs
Expand Up @@ -2,16 +2,16 @@ use crate::{
error::{ConnectorError, ErrorKind},
Filter,
};
use chrono::Utc;
use indexmap::{map::Keys, IndexMap};
use prisma_models::{
CompositeFieldRef, Field, ModelProjection, ModelRef, PrismaValue, ScalarFieldRef, SelectedField, SelectionResult,
};
use std::{borrow::Borrow, convert::TryInto, ops::Deref};

/// WriteArgs represent data to be written to an underlying data source.
#[derive(Debug, PartialEq, Clone, Default)]
#[derive(Debug, PartialEq, Clone)]
pub struct WriteArgs {
pub request_now: PrismaValue,
pub args: IndexMap<DatasourceFieldName, WriteOperation>,
}

Expand Down Expand Up @@ -352,24 +352,17 @@ impl TryInto<PrismaValue> for WriteOperation {
}
}

impl From<IndexMap<DatasourceFieldName, WriteOperation>> for WriteArgs {
fn from(args: IndexMap<DatasourceFieldName, WriteOperation>) -> Self {
Self { args }
impl WriteArgs {
pub fn new(args: IndexMap<DatasourceFieldName, WriteOperation>, request_now: PrismaValue) -> Self {
Self { args, request_now }
}
}

impl From<Vec<(DatasourceFieldName, WriteOperation)>> for WriteArgs {
fn from(pairs: Vec<(DatasourceFieldName, WriteOperation)>) -> Self {
pub fn new_empty(request_now: PrismaValue) -> Self {
Self {
args: pairs.into_iter().collect(),
args: Default::default(),
request_now,
}
}
}

impl WriteArgs {
pub fn new() -> Self {
Self { args: IndexMap::new() }
}

pub fn insert<T, V>(&mut self, key: T, arg: V)
where
Expand Down Expand Up @@ -404,19 +397,19 @@ impl WriteArgs {
}

pub fn add_datetimes(&mut self, model: &ModelRef) {
let now = PrismaValue::DateTime(Utc::now().into());
let updated_at_fields = model.fields().updated_at();
let value = &self.request_now;

for f in updated_at_fields {
if self.args.get(f.db_name()).is_none() {
self.args.insert(f.into(), WriteOperation::scalar_set(now.clone()));
self.args.insert(f.into(), WriteOperation::scalar_set(value.clone()));
}
}
}

pub fn update_datetimes(&mut self, model: ModelRef) {
pub fn update_datetimes(&mut self, model: &ModelRef) {
if !self.args.is_empty() {
self.add_datetimes(&model)
self.add_datetimes(model)
}
}

Expand Down Expand Up @@ -450,7 +443,7 @@ impl WriteArgs {
/// Picks all arguments out of `args` that are updating a value for a field
/// contained in `projection`, as those need to be merged into the records later on.
pub fn pick_args(projection: &ModelProjection, args: &WriteArgs) -> WriteArgs {
let pairs: Vec<_> = projection
let pairs = projection
.scalar_fields()
.into_iter()
.filter_map(|field| {
Expand All @@ -459,7 +452,7 @@ pub fn pick_args(projection: &ModelProjection, args: &WriteArgs) -> WriteArgs {
})
.collect();

WriteArgs::from(pairs)
WriteArgs::new(pairs, args.request_now.clone())
}

/// Merges the incoming write argument values into the given, already loaded, ids. Overwrites existing values.
Expand Down
Expand Up @@ -103,10 +103,6 @@ where
let col = sf.db_name().to_string();

let column = Column::from((full_table_name, col)).type_family(sf.type_family());

match sf.default_value.as_ref().and_then(|d| d.get()) {
Some(default) => column.default(sf.value(default)),
None => column.default(quaint::ast::DefaultValue::Generated),
}
column.default(quaint::ast::DefaultValue::Generated)
}
}
Expand Up @@ -9,7 +9,7 @@ use tracing::Span;

/// `INSERT` a new record to the database. Resulting an `INSERT` ast and an
/// optional `RecordProjection` if available from the arguments or model.
pub fn create_record(model: &ModelRef, mut args: WriteArgs, trace_id: Option<String>) -> Insert<'static> {
pub(crate) fn create_record(model: &ModelRef, mut args: WriteArgs, trace_id: Option<String>) -> Insert<'static> {
tomhoule marked this conversation as resolved.
Show resolved Hide resolved
let fields: Vec<_> = model
.fields()
.scalar()
Expand Down
97 changes: 56 additions & 41 deletions query-engine/core/src/executor/interpreting_executor.rs
Expand Up @@ -52,13 +52,16 @@ where
if let Some(tx_id) = tx_id {
self.itx_manager.execute(&tx_id, operation, trace_id).await
} else {
execute_single_self_contained(
&self.connector,
query_schema,
operation,
trace_id,
self.force_transactions,
)
super::with_request_now(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is the best way for now, I'm a bit worried though somebody adds a new operation, forgets this and we find it out way too late.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted above, it will panic very early and unambiguously. One test with an @default(now()) that uses the new method is coverage enough. We both agree it's not the ideal solution, but I'm not too worried about this specific problem.

execute_single_self_contained(
&self.connector,
query_schema,
operation,
trace_id,
self.force_transactions,
)
.await
})
.await
}
}
Expand Down Expand Up @@ -101,7 +104,13 @@ where
let mut conn = self.connector.get_connection().instrument(conn_span).await?;
let mut tx = conn.start_transaction(transaction.isolation_level()).await?;

let results = execute_many_operations(query_schema, tx.as_connection_like(), &operations, trace_id).await;
let results = super::with_request_now(execute_many_operations(
query_schema,
tx.as_connection_like(),
&operations,
trace_id,
))
.await;

if results.is_err() {
tx.rollback().await?;
Expand All @@ -111,13 +120,16 @@ where

results
} else {
execute_many_self_contained(
&self.connector,
query_schema,
&operations,
trace_id,
self.force_transactions,
)
super::with_request_now(async move {
execute_many_self_contained(
&self.connector,
query_schema,
&operations,
trace_id,
self.force_transactions,
)
.await
})
.await
}
}
Expand All @@ -139,35 +151,38 @@ where
valid_for_millis: u64,
isolation_level: Option<String>,
) -> crate::Result<TxId> {
let id = TxId::default();
trace!("[{}] Starting...", id);
let connection_name = self.connector.name();
let conn_span = info_span!(
"prisma:engine:connection",
user_facing = true,
"db.type" = connection_name.as_str()
);
let conn = time::timeout(
Duration::from_millis(max_acquisition_millis),
self.connector.get_connection(),
)
.instrument(conn_span)
.await;

let conn = conn.map_err(|_| TransactionError::AcquisitionTimeout)??;
let c_tx = OpenTx::start(conn, isolation_level).await?;

self.itx_manager
.create_tx(
query_schema.clone(),
id.clone(),
c_tx,
Duration::from_millis(valid_for_millis),
super::with_request_now(async move {
let id = TxId::default();
trace!("[{}] Starting...", id);
let connection_name = self.connector.name();
let conn_span = info_span!(
"prisma:engine:connection",
user_facing = true,
"db.type" = connection_name.as_str()
);
let conn = time::timeout(
Duration::from_millis(max_acquisition_millis),
self.connector.get_connection(),
)
.instrument(conn_span)
.await;

debug!("[{}] Started.", id);
Ok(id)
let conn = conn.map_err(|_| TransactionError::AcquisitionTimeout)??;
let c_tx = OpenTx::start(conn, isolation_level).await?;

self.itx_manager
.create_tx(
query_schema.clone(),
id.clone(),
c_tx,
Duration::from_millis(valid_for_millis),
)
.await;

debug!("[{}] Started.", id);
Ok(id)
})
.await
}

async fn commit_tx(&self, tx_id: TxId) -> crate::Result<()> {
Expand Down
31 changes: 30 additions & 1 deletion query-engine/core/src/executor/mod.rs
Expand Up @@ -18,7 +18,6 @@ use crate::{
};
use async_trait::async_trait;
use connector::Connector;

use tracing::Dispatch;

#[async_trait]
Expand Down Expand Up @@ -90,3 +89,33 @@ pub trait TransactionManager {
pub fn get_current_dispatcher() -> Dispatch {
tracing::dispatcher::get_default(|current| current.clone())
}

tokio::task_local! {
static REQUEST_NOW: prisma_value::PrismaValue;
}

/// A timestamp that should be the `NOW()` value for the whole duration of a request. So all
/// `@default(now())` and `@updatedAt` should use it.
///
/// That panics if REQUEST_NOW has not been set with with_request_now().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is cool actually. Better to panic than write a wrong date in a new operation!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only problem I see is somebody adds a new operation, does not test it with now() or updatedAt and the panic is triggered by a user.

///
/// If we had a query context we carry for all the lifetime of the query, it would belong there.
pub(crate) fn get_request_now() -> prisma_value::PrismaValue {
tomhoule marked this conversation as resolved.
Show resolved Hide resolved
REQUEST_NOW.with(|rn| rn.clone())
}

/// Execute a future with the current "now" timestamp that can be retrieved through
/// `get_request_now()`, initializing it if necessary.
pub(crate) async fn with_request_now<F, R>(fut: F) -> R
tomhoule marked this conversation as resolved.
Show resolved Hide resolved
where
F: std::future::Future<Output = R>,
{
let is_set = REQUEST_NOW.try_with(|_| async {}).is_ok();

if is_set {
fut.await
} else {
let now = prisma_value::PrismaValue::DateTime(chrono::Utc::now().into());
REQUEST_NOW.scope(now, fut).await
}
}
4 changes: 2 additions & 2 deletions query-engine/core/src/interactive_transactions/actors.rs
Expand Up @@ -272,7 +272,7 @@ pub fn spawn_itx_actor(
span.record("itx_id", &tx_id_str.as_str());

tokio::task::spawn(
async move {
crate::executor::with_request_now(async move {
let sleep = time::sleep(timeout);
tokio::pin!(sleep);

Expand Down Expand Up @@ -300,7 +300,7 @@ pub fn spawn_itx_actor(
let _ = send_done.send(server.id.clone()).await;

trace!("[{}] has stopped with {}", server.id.to_string(), server.cached_tx);
}
})
.instrument(span)
.with_subscriber(dispatcher),
);
Expand Down
2 changes: 1 addition & 1 deletion query-engine/core/src/query_ast/write.rs
Expand Up @@ -36,7 +36,7 @@ impl WriteQuery {
)
}

args.update_datetimes(model);
args.update_datetimes(&model);
}

pub fn returns(&self, field_selection: &FieldSelection) -> bool {
Expand Down