Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

qe: Implement isolation levels for batch transactions #3199

Merged
merged 2 commits into from Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -174,7 +174,7 @@ mod interactive_tx {
];

// Tx flag is not set, but it executes on an ITX.
runner.batch(queries, false).await?;
runner.batch(queries, false, None).await?;
SevInf marked this conversation as resolved.
Show resolved Hide resolved
let res = runner.commit_tx(tx_id.clone()).await?;
assert!(res.is_ok());
runner.clear_active_tx();
Expand All @@ -200,7 +200,7 @@ mod interactive_tx {
];

// Tx flag is not set, but it executes on an ITX.
runner.batch(queries, false).await?;
runner.batch(queries, false, None).await?;
let res = runner.rollback_tx(tx_id.clone()).await?;
assert!(res.is_ok());
runner.clear_active_tx();
Expand Down Expand Up @@ -228,7 +228,7 @@ mod interactive_tx {
];

// Tx flag is not set, but it executes on an ITX.
let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
batch_results.assert_failure(2002, None);

let res = runner.commit_tx(tx_id.clone()).await?;
Expand Down Expand Up @@ -313,6 +313,7 @@ mod interactive_tx {
"{ findManyTestModel { id } }".to_string(),
],
false,
None,
)
.await?
.assert_failure(
Expand Down
Expand Up @@ -25,7 +25,7 @@ mod compound_batch {
r#"query { findUniqueArtist(where: { firstName_lastName: { firstName:"Musti", lastName:"Naukio" }}) { firstName lastName }}"#.to_string()
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"firstName":"Musti","lastName":"Naukio"}}}]}"###
Expand All @@ -44,7 +44,7 @@ mod compound_batch {
r#"query {findUniqueArtist(where:{firstName_lastName:{firstName:"Naukio",lastName:"Musti"}}) {firstName lastName}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"firstName":"Musti","lastName":"Naukio"}}},{"data":{"findUniqueArtist":null}},{"data":{"findUniqueArtist":{"firstName":"Naukio","lastName":"Musti"}}}]}"###
Expand All @@ -62,7 +62,7 @@ mod compound_batch {
r#"query {findUniqueArtist(where:{firstName_lastName:{firstName:"Naukio",lastName:"Musti"}}) {lastName firstName}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"firstName":"Musti","lastName":"Naukio"}}},{"data":{"findUniqueArtist":{"firstName":"Naukio","lastName":"Musti"}}}]}"###
Expand All @@ -82,7 +82,7 @@ mod compound_batch {
r#"query {findUniqueArtist(where:{firstName_lastName:{firstName:"Naukio",lastName:"Musti"}}) {firstName lastName}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"firstName":"Musti","lastName":"Naukio"}}},{"data":{"findUniqueArtist":null}},{"data":{"findUniqueArtist":{"firstName":"Naukio","lastName":"Musti"}}}]}"###
Expand All @@ -100,7 +100,7 @@ mod compound_batch {
.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":null}}]}"###
Expand All @@ -118,7 +118,7 @@ mod compound_batch {
r#"query {findUniqueArtist(where:{firstName_lastName:{firstName:"NO",lastName:"AVAIL"}}) {firstName lastName}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"firstName":"Musti","lastName":"Naukio"}}},{"data":{"findUniqueArtist":null}}]}"###
Expand Down
Expand Up @@ -35,7 +35,7 @@ mod singlular_batch {

let queries = vec![r#"query { findUniqueArtist(where: { ArtistId: 1 }){ Name }}"#.to_string()];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"Name":"ArtistWithoutAlbums"}}}]}"###
Expand All @@ -54,7 +54,7 @@ mod singlular_batch {
r#"query { findUniqueArtist(where: { ArtistId: 2 }) { ArtistId, Name }}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"Name":"ArtistWithoutAlbums","ArtistId":1}}},{"data":{"findUniqueArtist":null}},{"data":{"findUniqueArtist":{"Name":"ArtistWithOneAlbumWithoutTracks","ArtistId":2}}}]}"###
Expand All @@ -74,7 +74,7 @@ mod singlular_batch {
r#"query { findUniqueArtist(where: { ArtistId: 2 }) { Name, ArtistId }}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"ArtistId":1,"Name":"ArtistWithoutAlbums"}}},{"data":{"findUniqueArtist":null}},{"data":{"findUniqueArtist":{"Name":"ArtistWithOneAlbumWithoutTracks","ArtistId":2}}}]}"###
Expand All @@ -93,7 +93,7 @@ mod singlular_batch {
r#"query { findUniqueArtist(where: { ArtistId: 420 }) { Albums { AlbumId, Title }}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"Albums":[{"AlbumId":2,"Title":"TheAlbumWithoutTracks"}]}}},{"data":{"findUniqueArtist":{"Albums":[]}}},{"data":{"findUniqueArtist":null}}]}"###
Expand All @@ -112,7 +112,7 @@ mod singlular_batch {
r#"query { findUniqueArtist(where: { ArtistId: 420 }) { Albums(where: { AlbumId: { equals: 2 }}) { AlbumId, Title }}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"Albums":[{"AlbumId":2,"Title":"TheAlbumWithoutTracks"}]}}},{"data":{"findUniqueArtist":{"Albums":[]}}},{"data":{"findUniqueArtist":null}}]}"###
Expand All @@ -131,7 +131,7 @@ mod singlular_batch {
r#"query { findUniqueArtist(where: { ArtistId: 420 }) { Albums(where: { AlbumId: { equals: 2 }}) { AlbumId, Title }}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"Albums":[{"AlbumId":2,"Title":"TheAlbumWithoutTracks"}]}}},{"data":{"findUniqueArtist":{"Albums":[]}}},{"data":{"findUniqueArtist":null}}]}"###
Expand All @@ -146,7 +146,7 @@ mod singlular_batch {

let queries = vec![r#"query { findUniqueArtist(where: { ArtistId: 420 }) { Name }}"#.to_string()];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":null}}]}"###
Expand All @@ -164,7 +164,7 @@ mod singlular_batch {
r#"query { findUniqueArtist(where: { ArtistId: 420}) { Name }}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"Name":"ArtistWithoutAlbums"}}},{"data":{"findUniqueArtist":null}}]}"###
Expand All @@ -182,7 +182,7 @@ mod singlular_batch {
r#"query { findUniqueArtist(where: { ArtistId: 1}) { Name }}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"Name":"ArtistWithoutAlbums"}}},{"data":{"findUniqueArtist":{"Name":"ArtistWithoutAlbums"}}}]}"###
Expand Down
Expand Up @@ -35,7 +35,7 @@ mod transactional {
r#"mutation { createOneModelA(data: { id: 2 }) { id }}"#.to_string(),
];

let batch_results = runner.batch(queries, true).await?;
let batch_results = runner.batch(queries, true, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"createOneModelA":{"id":1}}},{"data":{"createOneModelA":{"id":2}}}]}"###
Expand All @@ -51,7 +51,7 @@ mod transactional {
r#"mutation { createOneModelA(data: { id: 1 }) { id }}"#.to_string(),
];

let batch_results = runner.batch(queries, true).await?;
let batch_results = runner.batch(queries, true, None).await?;
batch_results.assert_failure(2002, None);

insta::assert_snapshot!(
Expand All @@ -78,7 +78,7 @@ mod transactional {
r#"mutation { createOneModelB(data: { id: 1, a: { create: { id: 1 } } }) { id }}"#.to_string(), // ModelB gets created before ModelA because of inlining,
];

let batch_results = runner.batch(queries, true).await?;
let batch_results = runner.batch(queries, true, None).await?;
batch_results.assert_failure(2002, None);

insta::assert_snapshot!(
Expand All @@ -89,6 +89,41 @@ mod transactional {
Ok(())
}

#[connector_test(exclude(MongoDb))]
async fn valid_isolation_level(runner: Runner) -> TestResult<()> {
let queries = vec![r#"mutation { createOneModelB(data: { id: 1 }) { id }}"#.to_string()];

let batch_results = runner.batch(queries, true, Some("Serializable".into())).await?;

insta::assert_snapshot!(batch_results.to_string(), @r###"{"batchResult":[{"data":{"createOneModelB":{"id":1}}}]}"###);

Ok(())
}

#[connector_test(exclude(MongoDb))]
async fn invalid_isolation_level(runner: Runner) -> TestResult<()> {
let queries = vec![r#"mutation { createOneModelB(data: { id: 1 }) { id }}"#.to_string()];

let batch_results = runner.batch(queries, true, Some("NotALevel".into())).await?;

batch_results.assert_failure(2023, Some("Invalid isolation level `NotALevel`".into()));

Ok(())
}

#[connector_test(only(MongoDb))]
async fn isolation_level_mongo(runner: Runner) -> TestResult<()> {
let queries = vec![r#"mutation { createOneModelB(data: { id: 1 }) { id }}"#.to_string()];

let batch_results = runner.batch(queries, true, Some("Serializable".into())).await?;
batch_results.assert_failure(
2026,
Some("Mongo does not support setting transaction isolation levels".into()),
);

Ok(())
}

// Only postgres for basic testing
#[connector_test(only(Postgres))]
async fn raw_mix(runner: Runner) -> TestResult<()> {
Expand All @@ -109,7 +144,7 @@ mod transactional {
r#"mutation { queryRaw(query: "SELECT * FROM \"ModelC\"", parameters: "[]") }"#.to_string(),
];

let batch_results = runner.batch(queries, true).await?;
let batch_results = runner.batch(queries, true, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"createOneModelB":{"id":1}}},{"data":{"executeRaw":1}},{"data":{"queryRaw":[]}}]}"###
Expand Down
Expand Up @@ -37,7 +37,7 @@ mod element {
count
}
}"#.to_string(),
], true).await?.to_string(),
], true, None).await?.to_string(),
@r###"{"batchResult":[{"data":{"executeRaw":0}},{"data":{"updateManyUser":{"count":0}}}]}"###
);

Expand Down
Expand Up @@ -36,7 +36,7 @@ mod regression {
r#"query {findUniqueArtist(where:{firstName_lastName_birth:{firstName:"Sponge",lastName:"Bob", birth: "1999-05-01T00:00:00.000Z"}}) {firstName lastName birth}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"firstName":"Sponge","lastName":"Bob","birth":"1999-05-01T00:00:00.000Z"}}},{"data":{"findUniqueArtist":{"firstName":"Sponge","lastName":"Bob","birth":"1999-05-01T00:00:00.000Z"}}}]}"###
Expand Down
Expand Up @@ -186,7 +186,7 @@ mod raw_mongo {
run_command_raw(
json!({ "insert": "TestModel", "documents": [{ "_id": 2, "field": "B" }, { "_id": 3, "field": "C" }] }),
)
], false).await?.to_string();
], false, None).await?.to_string();

insta::assert_snapshot!(
res,
Expand Down
Expand Up @@ -46,10 +46,16 @@ impl RunnerInterface for BinaryRunner {
Ok(PrismaResponse::Single(gql_response).into())
}

async fn batch(&self, queries: Vec<String>, transaction: bool) -> TestResult<crate::QueryResult> {
async fn batch(
&self,
queries: Vec<String>,
transaction: bool,
isolation_level: Option<String>,
) -> TestResult<crate::QueryResult> {
let query = GraphQlBody::Multi(MultiQuery::new(
queries.into_iter().map(Into::into).collect(),
transaction,
isolation_level,
));

let body = serde_json::to_vec(&query).unwrap();
Expand Down
Expand Up @@ -49,11 +49,17 @@ impl RunnerInterface for DirectRunner {
Ok(handler.handle(query, self.current_tx_id.clone(), None).await.into())
}

async fn batch(&self, queries: Vec<String>, transaction: bool) -> TestResult<crate::QueryResult> {
async fn batch(
&self,
queries: Vec<String>,
transaction: bool,
isolation_level: Option<String>,
) -> TestResult<crate::QueryResult> {
let handler = GraphQlHandler::new(&*self.executor, &self.query_schema);
let query = GraphQlBody::Multi(MultiQuery::new(
queries.into_iter().map(Into::into).collect(),
transaction,
isolation_level,
));

Ok(handler.handle(query, self.current_tx_id.clone(), None).await.into())
Expand Down
Expand Up @@ -22,7 +22,12 @@ pub trait RunnerInterface: Sized {
async fn query(&self, query: String) -> TestResult<QueryResult>;

/// Queries the engine with a batch.
async fn batch(&self, queries: Vec<String>, transaction: bool) -> TestResult<QueryResult>;
async fn batch(
&self,
queries: Vec<String>,
transaction: bool,
isolation_level: Option<String>,
) -> TestResult<QueryResult>;

/// start a transaction for a batch run
async fn start_tx(
Expand Down Expand Up @@ -133,11 +138,16 @@ impl Runner {
}
}

pub async fn batch(&self, queries: Vec<String>, transaction: bool) -> TestResult<QueryResult> {
pub async fn batch(
&self,
queries: Vec<String>,
transaction: bool,
isolation_level: Option<String>,
) -> TestResult<QueryResult> {
match self {
Runner::Direct(r) => r.batch(queries, transaction).await,
Runner::Direct(r) => r.batch(queries, transaction, isolation_level).await,
Runner::NodeApi(_) => todo!(),
Runner::Binary(r) => r.batch(queries, transaction).await,
Runner::Binary(r) => r.batch(queries, transaction, isolation_level).await,
}
}

Expand Down
15 changes: 11 additions & 4 deletions query-engine/core/src/executor/interpreting_executor.rs
@@ -1,6 +1,7 @@
use super::execute_operation::{execute_many_operations, execute_many_self_contained, execute_single_self_contained};
use crate::{
OpenTx, Operation, QueryExecutor, ResponseData, TransactionActorManager, TransactionError, TransactionManager, TxId,
BatchDocumentTransaction, CoreError, OpenTx, Operation, QueryExecutor, ResponseData, TransactionActorManager,
TransactionError, TransactionManager, TxId,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -78,21 +79,27 @@ where
&self,
tx_id: Option<TxId>,
operations: Vec<Operation>,
transactional: bool,
transaction: Option<BatchDocumentTransaction>,
SevInf marked this conversation as resolved.
Show resolved Hide resolved
query_schema: QuerySchemaRef,
trace_id: Option<String>,
) -> crate::Result<Vec<crate::Result<ResponseData>>> {
if let Some(tx_id) = tx_id {
let batch_isolation_level = transaction.and_then(|t| t.isolation_level());
if batch_isolation_level.is_some() {
return Err(CoreError::UnsupportedFeatureError(
"Can not set batch isolation level within interactive transaction".into(),
));
}
self.itx_manager.batch_execute(&tx_id, operations, trace_id).await
} else if transactional {
} else if let Some(transaction) = transaction {
let connection_name = self.connector.name();
let conn_span = info_span!(
"prisma:engine:connection",
user_facing = true,
"db.type" = connection_name.as_str()
);
let mut conn = self.connector.get_connection().instrument(conn_span).await?;
let mut tx = conn.start_transaction(None).await?;
let mut tx = conn.start_transaction(transaction.isolation_level()).await?;

let results = execute_many_operations(query_schema, tx.as_connection_like(), &operations, trace_id).await;

Expand Down