Skip to content

Commit

Permalink
feat(client): Implement isolation levels for bathc transactions
Browse files Browse the repository at this point in the history
Engines part: prisma/prisma-engines#3199

Batch `$transaction` call will now receive optional options
objects with `isolationLevel` property, allowing to pick any level,
supporting by current transaction. Internally, refactors a bunch of
things about the way transaction info is stored in `PrismaPromise`,
`Request` and `InternalRequest` types.

- All transaction-related properties are now grouped under `transaction`
  key. Distiction between iTx and batch transaction is done explcitly
  via discriminated union type.
- `runInTransaction` property is removed from everywhere expcept
  middleware public APIs. `transaction` field now indicates that the
  request should run in transaction.

For testing we are checking that engine correctly generates corresponding SQL.
Properly testing different isolation level effects would be very
difficult, especially in case of batch transactions, where we can not
execute any code between the queries.

Ref #9678
  • Loading branch information
SevInf committed Sep 20, 2022
1 parent 339d6d7 commit 9e86e88
Show file tree
Hide file tree
Showing 17 changed files with 374 additions and 105 deletions.
Expand Up @@ -731,7 +731,7 @@ export class PrismaClient<
*
* Read more in our [docs](https://www.prisma.io/docs/concepts/components/prisma-client/transactions).
*/
$transaction<P extends PrismaPromise<any>[]>(arg: [...P]): Promise<UnwrapTuple<P>>;
$transaction<P extends PrismaPromise<any>[]>(arg: [...P], options?: { isolationLevel?: Prisma.TransactionIsolationLevel }): Promise<UnwrapTuple<P>>;

/**
* \`prisma.post\`: Exposes CRUD operations for the **Post** model.
Expand Down
7 changes: 6 additions & 1 deletion packages/client/src/generation/TSClient/PrismaClient.ts
Expand Up @@ -11,6 +11,11 @@ import { Datasources } from './Datasources'
import type { Generatable } from './Generatable'

function batchingTransactionDefinition(this: PrismaClientClass) {
const args = ['arg: [...P]']
if (this.dmmf.hasEnumInNamespace('TransactionIsolationLevel', 'prisma')) {
args.push('options?: { isolationLevel?: Prisma.TransactionIsolationLevel }')
}
const argsString = args.join(', ')
return `
/**
* Allows the running of a sequence of read/write operations that are guaranteed to either succeed or fail as a whole.
Expand All @@ -25,7 +30,7 @@ function batchingTransactionDefinition(this: PrismaClientClass) {
*
* Read more in our [docs](https://www.prisma.io/docs/concepts/components/prisma-client/transactions).
*/
$transaction<P extends PrismaPromise<any>[]>(arg: [...P]): Promise<UnwrapTuple<P>>;`
$transaction<P extends PrismaPromise<any>[]>(${argsString}): Promise<UnwrapTuple<P>>;`
}

function interactiveTransactionDefinition(this: PrismaClientClass) {
Expand Down
47 changes: 25 additions & 22 deletions packages/client/src/runtime/RequestHandler.ts
Expand Up @@ -9,6 +9,7 @@ import {
PrismaClientRustPanicError,
PrismaClientUnknownRequestError,
} from '.'
import { PrismaPromiseTransaction } from './core/request/PrismaPromise'
import { DataLoader } from './DataLoader'
import type { Client, Unpacker } from './getPrismaClient'
import type { EngineMiddleware } from './MiddlewareHandler'
Expand All @@ -30,11 +31,10 @@ export type RequestParams = {
clientMethod: string
callsite?: CallSite
rejectOnNotFound?: RejectOnNotFound
runInTransaction?: boolean
transaction?: PrismaPromiseTransaction
engineHook?: EngineMiddleware
args: any
headers?: Record<string, string>
transactionId?: string | number
unpacker?: Unpacker
otelParentCtx?: Context
otelChildCtx?: Context
Expand All @@ -48,29 +48,30 @@ export type HandleErrorParams = {

export type Request = {
document: Document
runInTransaction?: boolean
transactionId?: string | number
transaction?: PrismaPromiseTransaction
headers?: Record<string, string>
otelParentCtx?: Context
otelChildCtx?: Context
tracingConfig?: TracingConfig
}

function getRequestInfo(request: Request) {
const txId = request.transactionId
const inTx = request.runInTransaction
const transaction = request.transaction
const headers = request.headers ?? {}
const traceparent = getTraceParent({ tracingConfig: request.tracingConfig })

// if the tx has a number for an id, then it's a regular batch tx
const _inTx = typeof txId === 'number' && inTx ? true : undefined
// if the tx has a string for id, it's an interactive transaction
const _txId = typeof txId === 'string' && inTx ? txId : undefined
if (transaction?.kind === 'itx') {
headers.transactionId = transaction.id
}

if (_txId !== undefined) headers.transactionId = _txId
if (traceparent !== undefined) headers.traceparent = traceparent
if (traceparent !== undefined) {
headers.traceparent = traceparent
}

return { inTx: _inTx, headers }
return {
batchTransaction: transaction?.kind === 'batch' ? transaction : undefined,
headers,
}
}

export class RequestHandler {
Expand All @@ -91,7 +92,9 @@ export class RequestHandler {
// TODO: pass the child information to QE for it to issue links to queries
// const links = requests.map((r) => trace.getSpanContext(r.otelChildCtx!))

return this.client._engine.requestBatch(queries, info.headers, info.inTx)
return this.client._engine.requestBatch(queries, info.headers, {
isolationLevel: info.batchTransaction?.isolationLevel,
})
},
singleLoader: (request) => {
const info = getRequestInfo(request)
Expand All @@ -100,8 +103,8 @@ export class RequestHandler {
return this.client._engine.request(query, info.headers)
},
batchBy: (request) => {
if (request.transactionId) {
return `transaction-${request.transactionId}`
if (request.transaction?.id) {
return `transaction-${request.transaction.id}`
}

return batchFindUniqueBy(request)
Expand All @@ -118,11 +121,10 @@ export class RequestHandler {
callsite,
rejectOnNotFound,
clientMethod,
runInTransaction,
engineHook,
args,
headers,
transactionId,
transaction,
unpacker,
otelParentCtx,
otelChildCtx,
Expand All @@ -149,18 +151,19 @@ export class RequestHandler {
const result = await engineHook(
{
document,
runInTransaction,
runInTransaction: Boolean(transaction),
},
(params) => {
return this.dataloader.request({ ...params, tracingConfig: this.client._tracingConfig })
},
(params) => this.dataloader.request({ ...params, tracingConfig: this.client._tracingConfig }),
)
data = result.data
elapsed = result.elapsed
} else {
const result = await this.dataloader.request({
document,
runInTransaction,
headers,
transactionId,
transaction,
otelParentCtx,
otelChildCtx,
tracingConfig: this.client._tracingConfig,
Expand Down
28 changes: 21 additions & 7 deletions packages/client/src/runtime/core/model/applyModel.ts
Expand Up @@ -61,13 +61,27 @@ export function applyModel(client: Client, dmmfModelName: string) {
const action = (paramOverrides: O.Optional<InternalRequestParams>) => (userArgs?: UserArgs) => {
const callSite = getCallSite(client._errorFormat) // used for showing better errors

return createPrismaPromise((txId, lock) => {
const data = { args: userArgs, dataPath: [] } // data and its dataPath for nested results
const action = { action: dmmfActionName, model: dmmfModelName } // action name and its related model
const method = { clientMethod: `${jsModelName}.${prop}`, jsModelName } // method name for display only
const tx = { runInTransaction: !!txId, transactionId: txId, lock } // transaction information
const trace = { callsite: callSite } // stack trace
const params = { ...data, ...action, ...method, ...tx, ...trace }
return createPrismaPromise((transaction, lock) => {
const params: InternalRequestParams = {
// data and its dataPath for nested results
args: userArgs,
dataPath: [],

// action name and its related model
action: dmmfActionName,
model: dmmfModelName,

// method name for display only
clientMethod: `${jsModelName}.${prop}`,
jsModelName,

// transaction information
transaction,
lock,

// stack trace
callsite: callSite,
}

return requestFn({ ...params, ...paramOverrides })
})
Expand Down
37 changes: 29 additions & 8 deletions packages/client/src/runtime/core/request/PrismaPromise.ts
@@ -1,3 +1,21 @@
import { IsolationLevel } from '@prisma/engine-core'

export type PrismaPromiseBatchTransaction = {
kind: 'batch'
id: number
isolationLevel?: IsolationLevel
}

export type PrismaPromiseInteractiveTransaction = {
kind: 'itx'
id: string
}

export type PrismaPromiseTransaction = PrismaPromiseBatchTransaction | PrismaPromiseInteractiveTransaction

export type BatchTransactionOptions = Omit<PrismaPromiseBatchTransaction, 'kind'>
export type InteractiveTransactionOptions = Omit<PrismaPromiseInteractiveTransaction, 'kind'>

/**
* Prisma's `Promise` that is backwards-compatible. All additions on top of the
* original `Promise` are optional so that it can be backwards-compatible.
Expand All @@ -8,31 +26,34 @@ export interface PrismaPromise<A> extends Promise<A> {
* Extension of the original `.then` function
* @param onfulfilled same as regular promises
* @param onrejected same as regular promises
* @param txId GUID for interactive txs
* @param transaction interactive transaction options
*/
then<R1 = A, R2 = never>(
onfulfilled?: (value: A) => R1 | PromiseLike<R1>,
onrejected?: (error: unknown) => R2 | PromiseLike<R2>,
txId?: string,
transaction?: InteractiveTransactionOptions,
): Promise<R1 | R2>

/**
* Extension of the original `.catch` function
* @param onrejected same as regular promises
* @param txId GUID for interactive txs
* @param transaction interactive transaction options
*/
catch<R = never>(onrejected?: ((reason: any) => R | PromiseLike<R>) | undefined | null, txId?: string): Promise<A | R>
catch<R = never>(
onrejected?: ((reason: any) => R | PromiseLike<R>) | undefined | null,
transaction?: InteractiveTransactionOptions,
): Promise<A | R>

/**
* Extension of the original `.finally` function
* @param onfinally same as regular promises
* @param txId GUID for interactive txs
* @param transaction interactive transaction options
*/
finally(onfinally?: (() => void) | undefined | null, txId?: string): Promise<A>
finally(onfinally?: (() => void) | undefined | null, transaction?: InteractiveTransactionOptions): Promise<A>

/**
* Called when executing a batch of regular tx
* @param txId for regular tx ids
* @param transaction transaction options for regular tx
*/
requestTransaction?(txId: number, lock?: PromiseLike<void>): PromiseLike<unknown>
requestTransaction?(transaction: BatchTransactionOptions, lock?: PromiseLike<void>): PromiseLike<unknown>
}
37 changes: 23 additions & 14 deletions packages/client/src/runtime/core/request/createPrismaPromise.ts
@@ -1,4 +1,4 @@
import type { PrismaPromise } from './PrismaPromise'
import type { InteractiveTransactionOptions, PrismaPromise, PrismaPromiseTransaction } from './PrismaPromise'

/**
* Creates a [[PrismaPromise]]. It is Prisma's implementation of `Promise` which
Expand All @@ -10,18 +10,18 @@ import type { PrismaPromise } from './PrismaPromise'
* @returns
*/
export function createPrismaPromise(
callback: (txId?: string | number, lock?: PromiseLike<void>) => PrismaPromise<unknown>,
callback: (transaction?: PrismaPromiseTransaction, lock?: PromiseLike<void>) => PrismaPromise<unknown>,
): PrismaPromise<unknown> {
let promise: PrismaPromise<unknown> | undefined
const _callback = (txId?: string | number, lock?: PromiseLike<void>, cached = true) => {
const _callback = (transaction?: PrismaPromiseTransaction, lock?: PromiseLike<void>, cached = true) => {
try {
// promises cannot be triggered twice after resolving
if (cached === true) {
return (promise ??= callback(txId, lock))
return (promise ??= callback(transaction, lock))
}

// but for for batch tx we need to trigger them again
return callback(txId, lock)
return callback(transaction, lock)
} catch (error) {
// if the callback throws, then we reject the promise
// and that is because exceptions are not always async
Expand All @@ -30,25 +30,34 @@ export function createPrismaPromise(
}

return {
then(onFulfilled, onRejected, txId?: string) {
return _callback(txId, undefined).then(onFulfilled, onRejected, txId)
then(onFulfilled, onRejected, transaction?) {
return _callback(createItx(transaction), undefined).then(onFulfilled, onRejected, transaction)
},
catch(onRejected, txId?: string) {
return _callback(txId, undefined).catch(onRejected, txId)
catch(onRejected, transaction?) {
return _callback(createItx(transaction), undefined).catch(onRejected, transaction)
},
finally(onFinally, txId?: string) {
return _callback(txId, undefined).finally(onFinally, txId)
finally(onFinally, transaction?) {
return _callback(createItx(transaction), undefined).finally(onFinally, transaction)
},
requestTransaction(txId: number, lock?: PromiseLike<void>) {
const promise = _callback(txId, lock, false)

requestTransaction(transactionOptions, lock?: PromiseLike<void>) {
const transaction = { kind: 'batch' as const, ...transactionOptions }
const promise = _callback(transaction, lock, false)

if (promise.requestTransaction) {
// we want to have support for nested promises
return promise.requestTransaction(txId, lock)
return promise.requestTransaction(transaction, lock)
}

return promise
},
[Symbol.toStringTag]: 'PrismaPromise',
}
}

function createItx(transaction: InteractiveTransactionOptions | undefined): PrismaPromiseTransaction | undefined {
if (transaction) {
return { kind: 'itx', ...transaction }
}
return undefined
}

0 comments on commit 9e86e88

Please sign in to comment.