Skip to content

Commit

Permalink
feat(client): Implement isolation levels for bathc transactions
Browse files Browse the repository at this point in the history
Engines part: prisma/prisma-engines#3199

Batch `$transaction` call will now receive optional options
objects with `isolationLevel` property, allowing to pick any level,
supporting by current transaction. Internally, refactors a bunch of
things about the way transaction info is stored in `PrismaPromise`,
`Request` and `InternalRequest` types.

- All transaction-related properties are now grouped under `transaction`
  key. Distiction between iTx and batch transaction is done explcitly
  via discriminated union type.
- `runInTransaction` property is removed. Empty `transaction` field now
  means that request is not running in transaction.

  For testing we are checking that engine correctly generates corresponding SQL.
  Properly testing different isolation level effects would be very
  difficult, especially in case of batch transactions, where we can not
  execute any code between the queries.

Ref #9678
  • Loading branch information
SevInf committed Sep 19, 2022
1 parent 7a13f55 commit d88c2fd
Show file tree
Hide file tree
Showing 18 changed files with 300 additions and 81 deletions.
Binary file not shown.
Binary file not shown.
Binary file not shown.
7 changes: 6 additions & 1 deletion packages/client/src/generation/TSClient/PrismaClient.ts
Expand Up @@ -11,6 +11,11 @@ import { Datasources } from './Datasources'
import type { Generatable } from './Generatable'

function batchingTransactionDefinition(this: PrismaClientClass) {
const args = ['arg: [...P]']
if (this.dmmf.hasEnumInNamespace('TransactionIsolationLevel', 'prisma')) {
args.push('options?: { isolationLevel?: Prisma.TransactionIsolationLevel }')
}
const argsString = args.join(', ')
return `
/**
* Allows the running of a sequence of read/write operations that are guaranteed to either succeed or fail as a whole.
Expand All @@ -25,7 +30,7 @@ function batchingTransactionDefinition(this: PrismaClientClass) {
*
* Read more in our [docs](https://www.prisma.io/docs/concepts/components/prisma-client/transactions).
*/
$transaction<P extends PrismaPromise<any>[]>(arg: [...P]): Promise<UnwrapTuple<P>>;`
$transaction<P extends PrismaPromise<any>[]>(${argsString}): Promise<UnwrapTuple<P>>;`
}

function interactiveTransactionDefinition(this: PrismaClientClass) {
Expand Down
43 changes: 22 additions & 21 deletions packages/client/src/runtime/RequestHandler.ts
Expand Up @@ -9,6 +9,7 @@ import {
PrismaClientRustPanicError,
PrismaClientUnknownRequestError,
} from '.'
import { PrismaPromiseTransaction } from './core/request/PrismaPromise'
import { DataLoader } from './DataLoader'
import type { Client, Unpacker } from './getPrismaClient'
import type { EngineMiddleware } from './MiddlewareHandler'
Expand All @@ -30,11 +31,10 @@ export type RequestParams = {
clientMethod: string
callsite?: CallSite
rejectOnNotFound?: RejectOnNotFound
runInTransaction?: boolean
transaction?: PrismaPromiseTransaction
engineHook?: EngineMiddleware
args: any
headers?: Record<string, string>
transactionId?: string | number
unpacker?: Unpacker
otelParentCtx?: Context
otelChildCtx?: Context
Expand All @@ -48,29 +48,30 @@ export type HandleErrorParams = {

export type Request = {
document: Document
runInTransaction?: boolean
transactionId?: string | number
transaction?: PrismaPromiseTransaction
headers?: Record<string, string>
otelParentCtx?: Context
otelChildCtx?: Context
tracingConfig?: TracingConfig
}

function getRequestInfo(request: Request) {
const txId = request.transactionId
const inTx = request.runInTransaction
const transaction = request.transaction
const headers = request.headers ?? {}
const traceparent = getTraceParent({ tracingConfig: request.tracingConfig })

// if the tx has a number for an id, then it's a regular batch tx
const _inTx = typeof txId === 'number' && inTx ? true : undefined
// if the tx has a string for id, it's an interactive transaction
const _txId = typeof txId === 'string' && inTx ? txId : undefined
if (transaction?.kind === 'itx') {
headers.transactionId = transaction.id
}

if (_txId !== undefined) headers.transactionId = _txId
if (traceparent !== undefined) headers.traceparent = traceparent
if (traceparent !== undefined) {
headers.traceparent = traceparent
}

return { inTx: _inTx, headers }
return {
batchTransaction: transaction?.kind === 'batch' ? transaction : undefined,
headers,
}
}

export class RequestHandler {
Expand All @@ -91,7 +92,9 @@ export class RequestHandler {
// TODO: pass the child information to QE for it to issue links to queries
// const links = requests.map((r) => trace.getSpanContext(r.otelChildCtx!))

return this.client._engine.requestBatch(queries, info.headers, info.inTx)
return this.client._engine.requestBatch(queries, info.headers, {
isolationLevel: info.batchTransaction?.isolationLevel,
})
},
singleLoader: (request) => {
const info = getRequestInfo(request)
Expand All @@ -100,8 +103,8 @@ export class RequestHandler {
return this.client._engine.request(query, info.headers)
},
batchBy: (request) => {
if (request.transactionId) {
return `transaction-${request.transactionId}`
if (request.transaction?.id) {
return `transaction-${request.transaction.id}`
}

return batchFindUniqueBy(request)
Expand All @@ -118,11 +121,10 @@ export class RequestHandler {
callsite,
rejectOnNotFound,
clientMethod,
runInTransaction,
engineHook,
args,
headers,
transactionId,
transaction,
unpacker,
otelParentCtx,
otelChildCtx,
Expand All @@ -149,7 +151,7 @@ export class RequestHandler {
const result = await engineHook(
{
document,
runInTransaction,
runInTransaction: Boolean(transaction),
},
(params) => this.dataloader.request({ ...params, tracingConfig: this.client._tracingConfig }),
)
Expand All @@ -158,9 +160,8 @@ export class RequestHandler {
} else {
const result = await this.dataloader.request({
document,
runInTransaction,
headers,
transactionId,
transaction,
otelParentCtx,
otelChildCtx,
tracingConfig: this.client._tracingConfig,
Expand Down
28 changes: 21 additions & 7 deletions packages/client/src/runtime/core/model/applyModel.ts
Expand Up @@ -61,13 +61,27 @@ export function applyModel(client: Client, dmmfModelName: string) {
const action = (paramOverrides: O.Optional<InternalRequestParams>) => (userArgs?: UserArgs) => {
const callSite = getCallSite(client._errorFormat) // used for showing better errors

return createPrismaPromise((txId, lock) => {
const data = { args: userArgs, dataPath: [] } // data and its dataPath for nested results
const action = { action: dmmfActionName, model: dmmfModelName } // action name and its related model
const method = { clientMethod: `${jsModelName}.${prop}`, jsModelName } // method name for display only
const tx = { runInTransaction: !!txId, transactionId: txId, lock } // transaction information
const trace = { callsite: callSite } // stack trace
const params = { ...data, ...action, ...method, ...tx, ...trace }
return createPrismaPromise((transaction, lock) => {
const params: InternalRequestParams = {
// data and its dataPath for nested results
args: userArgs,
dataPath: [],

// action name and its related model
action: dmmfActionName,
model: dmmfModelName,

// method name for display only
clientMethod: `${jsModelName}.${prop}`,
jsModelName,

// transaction information
transaction,
lock,

// stack trace
callsite: callSite,
}

return requestFn({ ...params, ...paramOverrides })
})
Expand Down
24 changes: 22 additions & 2 deletions packages/client/src/runtime/core/request/PrismaPromise.ts
@@ -1,3 +1,23 @@
import { IsolationLevel } from '@prisma/engine-core'

export type PrismaPromiseBatchTransaction = {
kind: 'batch'
id: number
isolationLevel?: IsolationLevel
}

export type PrismaPromiseInteractiveTransaction = {
kind: 'itx'
id: string
}

export type PrismaPromiseTransaction = PrismaPromiseBatchTransaction | PrismaPromiseInteractiveTransaction

export type RequestTransactionOptions = {
id: number
isolationLevel?: IsolationLevel
}

/**
* Prisma's `Promise` that is backwards-compatible. All additions on top of the
* original `Promise` are optional so that it can be backwards-compatible.
Expand Down Expand Up @@ -32,7 +52,7 @@ export interface PrismaPromise<A> extends Promise<A> {

/**
* Called when executing a batch of regular tx
* @param txId for regular tx ids
* @param transaction transaction options for regular tx
*/
requestTransaction?(txId: number, lock?: PromiseLike<void>): PromiseLike<unknown>
requestTransaction?(transaction: RequestTransactionOptions, lock?: PromiseLike<void>): PromiseLike<unknown>
}
28 changes: 18 additions & 10 deletions packages/client/src/runtime/core/request/createPrismaPromise.ts
@@ -1,4 +1,4 @@
import type { PrismaPromise } from './PrismaPromise'
import type { PrismaPromise, PrismaPromiseTransaction } from './PrismaPromise'

/**
* Creates a [[PrismaPromise]]. It is Prisma's implementation of `Promise` which
Expand All @@ -10,13 +10,13 @@ import type { PrismaPromise } from './PrismaPromise'
* @returns
*/
export function createPrismaPromise(
callback: (txId?: string | number, lock?: PromiseLike<void>) => PrismaPromise<unknown>,
callback: (transaction?: PrismaPromiseTransaction, lock?: PromiseLike<void>) => PrismaPromise<unknown>,
): PrismaPromise<unknown> {
let promise: PrismaPromise<unknown> | undefined
const _callback = (txId?: string | number, lock?: PromiseLike<void>) => {
const _callback = (transaction?: PrismaPromiseTransaction, lock?: PromiseLike<void>) => {
try {
// we allow the callback to be executed only one time
return (promise ??= callback(txId, lock))
return (promise ??= callback(transaction, lock))
} catch (error) {
// if the callback throws, then we reject the promise
// and that is because exceptions are not always async
Expand All @@ -26,24 +26,32 @@ export function createPrismaPromise(

return {
then(onFulfilled, onRejected, txId?: string) {
return _callback(txId, undefined).then(onFulfilled, onRejected, txId)
return _callback(itxOptions(txId), undefined).then(onFulfilled, onRejected, txId)
},
catch(onRejected, txId?: string) {
return _callback(txId, undefined).catch(onRejected, txId)
return _callback(itxOptions(txId), undefined).catch(onRejected, txId)
},
finally(onFinally, txId?: string) {
return _callback(txId, undefined).finally(onFinally, txId)
return _callback(itxOptions(txId), undefined).finally(onFinally, txId)
},
requestTransaction(txId: number, lock?: PromiseLike<void>) {
const promise = _callback(txId, lock)
requestTransaction(transactionOptions, lock?: PromiseLike<void>) {
const transaction = { kind: 'batch' as const, ...transactionOptions }
const promise = _callback(transaction, lock)

if (promise.requestTransaction) {
// we want to have support for nested promises
return promise.requestTransaction(txId, lock)
return promise.requestTransaction(transaction, lock)
}

return promise
},
[Symbol.toStringTag]: 'PrismaPromise',
}
}

function itxOptions(id: string | undefined): PrismaPromiseTransaction | undefined {
if (typeof id === 'string') {
return { kind: 'itx', id }
}
return undefined
}

0 comments on commit d88c2fd

Please sign in to comment.