Skip to content

Commit

Permalink
Merge pull request #3199 from prisma/feat/batch-isolation-level
Browse files Browse the repository at this point in the history
qe: Implement isolation levels for batch transactions
  • Loading branch information
SevInf committed Sep 20, 2022
2 parents f71b104 + 3eb13fb commit 94146ed
Show file tree
Hide file tree
Showing 15 changed files with 142 additions and 53 deletions.
Expand Up @@ -174,7 +174,7 @@ mod interactive_tx {
];

// Tx flag is not set, but it executes on an ITX.
runner.batch(queries, false).await?;
runner.batch(queries, false, None).await?;
let res = runner.commit_tx(tx_id.clone()).await?;
assert!(res.is_ok());
runner.clear_active_tx();
Expand All @@ -200,7 +200,7 @@ mod interactive_tx {
];

// Tx flag is not set, but it executes on an ITX.
runner.batch(queries, false).await?;
runner.batch(queries, false, None).await?;
let res = runner.rollback_tx(tx_id.clone()).await?;
assert!(res.is_ok());
runner.clear_active_tx();
Expand Down Expand Up @@ -228,7 +228,7 @@ mod interactive_tx {
];

// Tx flag is not set, but it executes on an ITX.
let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
batch_results.assert_failure(2002, None);

let res = runner.commit_tx(tx_id.clone()).await?;
Expand Down Expand Up @@ -313,6 +313,7 @@ mod interactive_tx {
"{ findManyTestModel { id } }".to_string(),
],
false,
None,
)
.await?
.assert_failure(
Expand Down
Expand Up @@ -25,7 +25,7 @@ mod compound_batch {
r#"query { findUniqueArtist(where: { firstName_lastName: { firstName:"Musti", lastName:"Naukio" }}) { firstName lastName }}"#.to_string()
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"firstName":"Musti","lastName":"Naukio"}}}]}"###
Expand All @@ -44,7 +44,7 @@ mod compound_batch {
r#"query {findUniqueArtist(where:{firstName_lastName:{firstName:"Naukio",lastName:"Musti"}}) {firstName lastName}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"firstName":"Musti","lastName":"Naukio"}}},{"data":{"findUniqueArtist":null}},{"data":{"findUniqueArtist":{"firstName":"Naukio","lastName":"Musti"}}}]}"###
Expand All @@ -62,7 +62,7 @@ mod compound_batch {
r#"query {findUniqueArtist(where:{firstName_lastName:{firstName:"Naukio",lastName:"Musti"}}) {lastName firstName}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"firstName":"Musti","lastName":"Naukio"}}},{"data":{"findUniqueArtist":{"firstName":"Naukio","lastName":"Musti"}}}]}"###
Expand All @@ -82,7 +82,7 @@ mod compound_batch {
r#"query {findUniqueArtist(where:{firstName_lastName:{firstName:"Naukio",lastName:"Musti"}}) {firstName lastName}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"firstName":"Musti","lastName":"Naukio"}}},{"data":{"findUniqueArtist":null}},{"data":{"findUniqueArtist":{"firstName":"Naukio","lastName":"Musti"}}}]}"###
Expand All @@ -100,7 +100,7 @@ mod compound_batch {
.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":null}}]}"###
Expand All @@ -118,7 +118,7 @@ mod compound_batch {
r#"query {findUniqueArtist(where:{firstName_lastName:{firstName:"NO",lastName:"AVAIL"}}) {firstName lastName}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"firstName":"Musti","lastName":"Naukio"}}},{"data":{"findUniqueArtist":null}}]}"###
Expand Down
Expand Up @@ -35,7 +35,7 @@ mod singlular_batch {

let queries = vec![r#"query { findUniqueArtist(where: { ArtistId: 1 }){ Name }}"#.to_string()];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"Name":"ArtistWithoutAlbums"}}}]}"###
Expand All @@ -54,7 +54,7 @@ mod singlular_batch {
r#"query { findUniqueArtist(where: { ArtistId: 2 }) { ArtistId, Name }}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"Name":"ArtistWithoutAlbums","ArtistId":1}}},{"data":{"findUniqueArtist":null}},{"data":{"findUniqueArtist":{"Name":"ArtistWithOneAlbumWithoutTracks","ArtistId":2}}}]}"###
Expand All @@ -74,7 +74,7 @@ mod singlular_batch {
r#"query { findUniqueArtist(where: { ArtistId: 2 }) { Name, ArtistId }}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"ArtistId":1,"Name":"ArtistWithoutAlbums"}}},{"data":{"findUniqueArtist":null}},{"data":{"findUniqueArtist":{"Name":"ArtistWithOneAlbumWithoutTracks","ArtistId":2}}}]}"###
Expand All @@ -93,7 +93,7 @@ mod singlular_batch {
r#"query { findUniqueArtist(where: { ArtistId: 420 }) { Albums { AlbumId, Title }}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"Albums":[{"AlbumId":2,"Title":"TheAlbumWithoutTracks"}]}}},{"data":{"findUniqueArtist":{"Albums":[]}}},{"data":{"findUniqueArtist":null}}]}"###
Expand All @@ -112,7 +112,7 @@ mod singlular_batch {
r#"query { findUniqueArtist(where: { ArtistId: 420 }) { Albums(where: { AlbumId: { equals: 2 }}) { AlbumId, Title }}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"Albums":[{"AlbumId":2,"Title":"TheAlbumWithoutTracks"}]}}},{"data":{"findUniqueArtist":{"Albums":[]}}},{"data":{"findUniqueArtist":null}}]}"###
Expand All @@ -131,7 +131,7 @@ mod singlular_batch {
r#"query { findUniqueArtist(where: { ArtistId: 420 }) { Albums(where: { AlbumId: { equals: 2 }}) { AlbumId, Title }}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"Albums":[{"AlbumId":2,"Title":"TheAlbumWithoutTracks"}]}}},{"data":{"findUniqueArtist":{"Albums":[]}}},{"data":{"findUniqueArtist":null}}]}"###
Expand All @@ -146,7 +146,7 @@ mod singlular_batch {

let queries = vec![r#"query { findUniqueArtist(where: { ArtistId: 420 }) { Name }}"#.to_string()];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":null}}]}"###
Expand All @@ -164,7 +164,7 @@ mod singlular_batch {
r#"query { findUniqueArtist(where: { ArtistId: 420}) { Name }}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"Name":"ArtistWithoutAlbums"}}},{"data":{"findUniqueArtist":null}}]}"###
Expand All @@ -182,7 +182,7 @@ mod singlular_batch {
r#"query { findUniqueArtist(where: { ArtistId: 1}) { Name }}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"Name":"ArtistWithoutAlbums"}}},{"data":{"findUniqueArtist":{"Name":"ArtistWithoutAlbums"}}}]}"###
Expand Down
Expand Up @@ -35,7 +35,7 @@ mod transactional {
r#"mutation { createOneModelA(data: { id: 2 }) { id }}"#.to_string(),
];

let batch_results = runner.batch(queries, true).await?;
let batch_results = runner.batch(queries, true, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"createOneModelA":{"id":1}}},{"data":{"createOneModelA":{"id":2}}}]}"###
Expand All @@ -51,7 +51,7 @@ mod transactional {
r#"mutation { createOneModelA(data: { id: 1 }) { id }}"#.to_string(),
];

let batch_results = runner.batch(queries, true).await?;
let batch_results = runner.batch(queries, true, None).await?;
batch_results.assert_failure(2002, None);

insta::assert_snapshot!(
Expand All @@ -78,7 +78,7 @@ mod transactional {
r#"mutation { createOneModelB(data: { id: 1, a: { create: { id: 1 } } }) { id }}"#.to_string(), // ModelB gets created before ModelA because of inlining,
];

let batch_results = runner.batch(queries, true).await?;
let batch_results = runner.batch(queries, true, None).await?;
batch_results.assert_failure(2002, None);

insta::assert_snapshot!(
Expand All @@ -89,6 +89,41 @@ mod transactional {
Ok(())
}

#[connector_test(exclude(MongoDb))]
async fn valid_isolation_level(runner: Runner) -> TestResult<()> {
let queries = vec![r#"mutation { createOneModelB(data: { id: 1 }) { id }}"#.to_string()];

let batch_results = runner.batch(queries, true, Some("Serializable".into())).await?;

insta::assert_snapshot!(batch_results.to_string(), @r###"{"batchResult":[{"data":{"createOneModelB":{"id":1}}}]}"###);

Ok(())
}

#[connector_test(exclude(MongoDb))]
async fn invalid_isolation_level(runner: Runner) -> TestResult<()> {
let queries = vec![r#"mutation { createOneModelB(data: { id: 1 }) { id }}"#.to_string()];

let batch_results = runner.batch(queries, true, Some("NotALevel".into())).await?;

batch_results.assert_failure(2023, Some("Invalid isolation level `NotALevel`".into()));

Ok(())
}

#[connector_test(only(MongoDb))]
async fn isolation_level_mongo(runner: Runner) -> TestResult<()> {
let queries = vec![r#"mutation { createOneModelB(data: { id: 1 }) { id }}"#.to_string()];

let batch_results = runner.batch(queries, true, Some("Serializable".into())).await?;
batch_results.assert_failure(
2026,
Some("Mongo does not support setting transaction isolation levels".into()),
);

Ok(())
}

// Only postgres for basic testing
#[connector_test(only(Postgres))]
async fn raw_mix(runner: Runner) -> TestResult<()> {
Expand All @@ -109,7 +144,7 @@ mod transactional {
r#"mutation { queryRaw(query: "SELECT * FROM \"ModelC\"", parameters: "[]") }"#.to_string(),
];

let batch_results = runner.batch(queries, true).await?;
let batch_results = runner.batch(queries, true, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"createOneModelB":{"id":1}}},{"data":{"executeRaw":1}},{"data":{"queryRaw":[]}}]}"###
Expand Down
Expand Up @@ -37,7 +37,7 @@ mod element {
count
}
}"#.to_string(),
], true).await?.to_string(),
], true, None).await?.to_string(),
@r###"{"batchResult":[{"data":{"executeRaw":0}},{"data":{"updateManyUser":{"count":0}}}]}"###
);

Expand Down
Expand Up @@ -36,7 +36,7 @@ mod regression {
r#"query {findUniqueArtist(where:{firstName_lastName_birth:{firstName:"Sponge",lastName:"Bob", birth: "1999-05-01T00:00:00.000Z"}}) {firstName lastName birth}}"#.to_string(),
];

let batch_results = runner.batch(queries, false).await?;
let batch_results = runner.batch(queries, false, None).await?;
insta::assert_snapshot!(
batch_results.to_string(),
@r###"{"batchResult":[{"data":{"findUniqueArtist":{"firstName":"Sponge","lastName":"Bob","birth":"1999-05-01T00:00:00.000Z"}}},{"data":{"findUniqueArtist":{"firstName":"Sponge","lastName":"Bob","birth":"1999-05-01T00:00:00.000Z"}}}]}"###
Expand Down
Expand Up @@ -186,7 +186,7 @@ mod raw_mongo {
run_command_raw(
json!({ "insert": "TestModel", "documents": [{ "_id": 2, "field": "B" }, { "_id": 3, "field": "C" }] }),
)
], false).await?.to_string();
], false, None).await?.to_string();

insta::assert_snapshot!(
res,
Expand Down
Expand Up @@ -46,10 +46,16 @@ impl RunnerInterface for BinaryRunner {
Ok(PrismaResponse::Single(gql_response).into())
}

async fn batch(&self, queries: Vec<String>, transaction: bool) -> TestResult<crate::QueryResult> {
async fn batch(
&self,
queries: Vec<String>,
transaction: bool,
isolation_level: Option<String>,
) -> TestResult<crate::QueryResult> {
let query = GraphQlBody::Multi(MultiQuery::new(
queries.into_iter().map(Into::into).collect(),
transaction,
isolation_level,
));

let body = serde_json::to_vec(&query).unwrap();
Expand Down
Expand Up @@ -49,11 +49,17 @@ impl RunnerInterface for DirectRunner {
Ok(handler.handle(query, self.current_tx_id.clone(), None).await.into())
}

async fn batch(&self, queries: Vec<String>, transaction: bool) -> TestResult<crate::QueryResult> {
async fn batch(
&self,
queries: Vec<String>,
transaction: bool,
isolation_level: Option<String>,
) -> TestResult<crate::QueryResult> {
let handler = GraphQlHandler::new(&*self.executor, &self.query_schema);
let query = GraphQlBody::Multi(MultiQuery::new(
queries.into_iter().map(Into::into).collect(),
transaction,
isolation_level,
));

Ok(handler.handle(query, self.current_tx_id.clone(), None).await.into())
Expand Down
Expand Up @@ -22,7 +22,12 @@ pub trait RunnerInterface: Sized {
async fn query(&self, query: String) -> TestResult<QueryResult>;

/// Queries the engine with a batch.
async fn batch(&self, queries: Vec<String>, transaction: bool) -> TestResult<QueryResult>;
async fn batch(
&self,
queries: Vec<String>,
transaction: bool,
isolation_level: Option<String>,
) -> TestResult<QueryResult>;

/// start a transaction for a batch run
async fn start_tx(
Expand Down Expand Up @@ -133,11 +138,16 @@ impl Runner {
}
}

pub async fn batch(&self, queries: Vec<String>, transaction: bool) -> TestResult<QueryResult> {
pub async fn batch(
&self,
queries: Vec<String>,
transaction: bool,
isolation_level: Option<String>,
) -> TestResult<QueryResult> {
match self {
Runner::Direct(r) => r.batch(queries, transaction).await,
Runner::Direct(r) => r.batch(queries, transaction, isolation_level).await,
Runner::NodeApi(_) => todo!(),
Runner::Binary(r) => r.batch(queries, transaction).await,
Runner::Binary(r) => r.batch(queries, transaction, isolation_level).await,
}
}

Expand Down
15 changes: 11 additions & 4 deletions query-engine/core/src/executor/interpreting_executor.rs
@@ -1,6 +1,7 @@
use super::execute_operation::{execute_many_operations, execute_many_self_contained, execute_single_self_contained};
use crate::{
OpenTx, Operation, QueryExecutor, ResponseData, TransactionActorManager, TransactionError, TransactionManager, TxId,
BatchDocumentTransaction, CoreError, OpenTx, Operation, QueryExecutor, ResponseData, TransactionActorManager,
TransactionError, TransactionManager, TxId,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -78,21 +79,27 @@ where
&self,
tx_id: Option<TxId>,
operations: Vec<Operation>,
transactional: bool,
transaction: Option<BatchDocumentTransaction>,
query_schema: QuerySchemaRef,
trace_id: Option<String>,
) -> crate::Result<Vec<crate::Result<ResponseData>>> {
if let Some(tx_id) = tx_id {
let batch_isolation_level = transaction.and_then(|t| t.isolation_level());
if batch_isolation_level.is_some() {
return Err(CoreError::UnsupportedFeatureError(
"Can not set batch isolation level within interactive transaction".into(),
));
}
self.itx_manager.batch_execute(&tx_id, operations, trace_id).await
} else if transactional {
} else if let Some(transaction) = transaction {
let connection_name = self.connector.name();
let conn_span = info_span!(
"prisma:engine:connection",
user_facing = true,
"db.type" = connection_name.as_str()
);
let mut conn = self.connector.get_connection().instrument(conn_span).await?;
let mut tx = conn.start_transaction(None).await?;
let mut tx = conn.start_transaction(transaction.isolation_level()).await?;

let results = execute_many_operations(query_schema, tx.as_connection_like(), &operations, trace_id).await;

Expand Down

0 comments on commit 94146ed

Please sign in to comment.