Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): Implement isolation levels for batched transactions #15362

Merged
merged 1 commit into from Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -731,7 +731,7 @@ export class PrismaClient<
*
* Read more in our [docs](https://www.prisma.io/docs/concepts/components/prisma-client/transactions).
*/
$transaction<P extends PrismaPromise<any>[]>(arg: [...P]): Promise<UnwrapTuple<P>>;
$transaction<P extends PrismaPromise<any>[]>(arg: [...P], options?: { isolationLevel?: Prisma.TransactionIsolationLevel }): Promise<UnwrapTuple<P>>;

/**
* \`prisma.post\`: Exposes CRUD operations for the **Post** model.
Expand Down
7 changes: 6 additions & 1 deletion packages/client/src/generation/TSClient/PrismaClient.ts
Expand Up @@ -11,6 +11,11 @@ import { Datasources } from './Datasources'
import type { Generatable } from './Generatable'

function batchingTransactionDefinition(this: PrismaClientClass) {
const args = ['arg: [...P]']
if (this.dmmf.hasEnumInNamespace('TransactionIsolationLevel', 'prisma')) {
args.push('options?: { isolationLevel?: Prisma.TransactionIsolationLevel }')
}
const argsString = args.join(', ')
return `
/**
* Allows the running of a sequence of read/write operations that are guaranteed to either succeed or fail as a whole.
Expand All @@ -25,7 +30,7 @@ function batchingTransactionDefinition(this: PrismaClientClass) {
*
* Read more in our [docs](https://www.prisma.io/docs/concepts/components/prisma-client/transactions).
*/
$transaction<P extends PrismaPromise<any>[]>(arg: [...P]): Promise<UnwrapTuple<P>>;`
$transaction<P extends PrismaPromise<any>[]>(${argsString}): Promise<UnwrapTuple<P>>;`
}

function interactiveTransactionDefinition(this: PrismaClientClass) {
Expand Down
47 changes: 25 additions & 22 deletions packages/client/src/runtime/RequestHandler.ts
Expand Up @@ -9,6 +9,7 @@ import {
PrismaClientRustPanicError,
PrismaClientUnknownRequestError,
} from '.'
import { PrismaPromiseTransaction } from './core/request/PrismaPromise'
import { DataLoader } from './DataLoader'
import type { Client, Unpacker } from './getPrismaClient'
import type { EngineMiddleware } from './MiddlewareHandler'
Expand All @@ -30,11 +31,10 @@ export type RequestParams = {
clientMethod: string
callsite?: CallSite
rejectOnNotFound?: RejectOnNotFound
runInTransaction?: boolean
millsp marked this conversation as resolved.
Show resolved Hide resolved
transaction?: PrismaPromiseTransaction
engineHook?: EngineMiddleware
args: any
headers?: Record<string, string>
transactionId?: string | number
unpacker?: Unpacker
otelParentCtx?: Context
otelChildCtx?: Context
Expand All @@ -48,29 +48,30 @@ export type HandleErrorParams = {

export type Request = {
document: Document
runInTransaction?: boolean
transactionId?: string | number
transaction?: PrismaPromiseTransaction
headers?: Record<string, string>
otelParentCtx?: Context
otelChildCtx?: Context
tracingConfig?: TracingConfig
}

function getRequestInfo(request: Request) {
const txId = request.transactionId
const inTx = request.runInTransaction
const transaction = request.transaction
const headers = request.headers ?? {}
const traceparent = getTraceParent({ tracingConfig: request.tracingConfig })

// if the tx has a number for an id, then it's a regular batch tx
const _inTx = typeof txId === 'number' && inTx ? true : undefined
millsp marked this conversation as resolved.
Show resolved Hide resolved
// if the tx has a string for id, it's an interactive transaction
const _txId = typeof txId === 'string' && inTx ? txId : undefined
if (transaction?.kind === 'itx') {
headers.transactionId = transaction.id
}

if (_txId !== undefined) headers.transactionId = _txId
if (traceparent !== undefined) headers.traceparent = traceparent
if (traceparent !== undefined) {
headers.traceparent = traceparent
}

return { inTx: _inTx, headers }
return {
batchTransaction: transaction?.kind === 'batch' ? transaction : undefined,
headers,
}
}

export class RequestHandler {
Expand All @@ -91,7 +92,9 @@ export class RequestHandler {
// TODO: pass the child information to QE for it to issue links to queries
// const links = requests.map((r) => trace.getSpanContext(r.otelChildCtx!))

return this.client._engine.requestBatch(queries, info.headers, info.inTx)
return this.client._engine.requestBatch(queries, info.headers, {
isolationLevel: info.batchTransaction?.isolationLevel,
})
},
singleLoader: (request) => {
const info = getRequestInfo(request)
Expand All @@ -100,8 +103,8 @@ export class RequestHandler {
return this.client._engine.request(query, info.headers)
},
batchBy: (request) => {
if (request.transactionId) {
return `transaction-${request.transactionId}`
if (request.transaction?.id) {
return `transaction-${request.transaction.id}`
}

return batchFindUniqueBy(request)
Expand All @@ -118,11 +121,10 @@ export class RequestHandler {
callsite,
rejectOnNotFound,
clientMethod,
runInTransaction,
engineHook,
args,
headers,
transactionId,
transaction,
unpacker,
otelParentCtx,
otelChildCtx,
Expand All @@ -149,18 +151,19 @@ export class RequestHandler {
const result = await engineHook(
{
document,
runInTransaction,
runInTransaction: Boolean(transaction),
SevInf marked this conversation as resolved.
Show resolved Hide resolved
},
(params) => {
return this.dataloader.request({ ...params, tracingConfig: this.client._tracingConfig })
},
(params) => this.dataloader.request({ ...params, tracingConfig: this.client._tracingConfig }),
)
data = result.data
elapsed = result.elapsed
} else {
const result = await this.dataloader.request({
document,
runInTransaction,
headers,
transactionId,
transaction,
otelParentCtx,
otelChildCtx,
tracingConfig: this.client._tracingConfig,
Expand Down
28 changes: 21 additions & 7 deletions packages/client/src/runtime/core/model/applyModel.ts
Expand Up @@ -61,13 +61,27 @@ export function applyModel(client: Client, dmmfModelName: string) {
const action = (paramOverrides: O.Optional<InternalRequestParams>) => (userArgs?: UserArgs) => {
const callSite = getCallSite(client._errorFormat) // used for showing better errors

return createPrismaPromise((txId, lock) => {
const data = { args: userArgs, dataPath: [] } // data and its dataPath for nested results
const action = { action: dmmfActionName, model: dmmfModelName } // action name and its related model
const method = { clientMethod: `${jsModelName}.${prop}`, jsModelName } // method name for display only
const tx = { runInTransaction: !!txId, transactionId: txId, lock } // transaction information
const trace = { callsite: callSite } // stack trace
const params = { ...data, ...action, ...method, ...tx, ...trace }
return createPrismaPromise((transaction, lock) => {
const params: InternalRequestParams = {
// data and its dataPath for nested results
args: userArgs,
SevInf marked this conversation as resolved.
Show resolved Hide resolved
dataPath: [],

// action name and its related model
action: dmmfActionName,
model: dmmfModelName,

// method name for display only
clientMethod: `${jsModelName}.${prop}`,
jsModelName,

// transaction information
transaction,
lock,

// stack trace
callsite: callSite,
}

return requestFn({ ...params, ...paramOverrides })
})
Expand Down
37 changes: 29 additions & 8 deletions packages/client/src/runtime/core/request/PrismaPromise.ts
@@ -1,3 +1,21 @@
import { IsolationLevel } from '@prisma/engine-core'

export type PrismaPromiseBatchTransaction = {
kind: 'batch'
id: number
isolationLevel?: IsolationLevel
}

export type PrismaPromiseInteractiveTransaction = {
kind: 'itx'
id: string
}

export type PrismaPromiseTransaction = PrismaPromiseBatchTransaction | PrismaPromiseInteractiveTransaction

export type BatchTransactionOptions = Omit<PrismaPromiseBatchTransaction, 'kind'>
export type InteractiveTransactionOptions = Omit<PrismaPromiseInteractiveTransaction, 'kind'>

/**
* Prisma's `Promise` that is backwards-compatible. All additions on top of the
* original `Promise` are optional so that it can be backwards-compatible.
Expand All @@ -8,31 +26,34 @@ export interface PrismaPromise<A> extends Promise<A> {
* Extension of the original `.then` function
* @param onfulfilled same as regular promises
* @param onrejected same as regular promises
* @param txId GUID for interactive txs
* @param transaction interactive transaction options
*/
then<R1 = A, R2 = never>(
onfulfilled?: (value: A) => R1 | PromiseLike<R1>,
onrejected?: (error: unknown) => R2 | PromiseLike<R2>,
txId?: string,
transaction?: InteractiveTransactionOptions,
): Promise<R1 | R2>

/**
* Extension of the original `.catch` function
* @param onrejected same as regular promises
* @param txId GUID for interactive txs
* @param transaction interactive transaction options
*/
catch<R = never>(onrejected?: ((reason: any) => R | PromiseLike<R>) | undefined | null, txId?: string): Promise<A | R>
catch<R = never>(
onrejected?: ((reason: any) => R | PromiseLike<R>) | undefined | null,
transaction?: InteractiveTransactionOptions,
): Promise<A | R>

/**
* Extension of the original `.finally` function
* @param onfinally same as regular promises
* @param txId GUID for interactive txs
* @param transaction interactive transaction options
*/
finally(onfinally?: (() => void) | undefined | null, txId?: string): Promise<A>
finally(onfinally?: (() => void) | undefined | null, transaction?: InteractiveTransactionOptions): Promise<A>

/**
* Called when executing a batch of regular tx
* @param txId for regular tx ids
* @param transaction transaction options for regular tx
*/
requestTransaction?(txId: number, lock?: PromiseLike<void>): PromiseLike<unknown>
requestTransaction?(transaction: BatchTransactionOptions, lock?: PromiseLike<void>): PromiseLike<unknown>
}
37 changes: 23 additions & 14 deletions packages/client/src/runtime/core/request/createPrismaPromise.ts
@@ -1,4 +1,4 @@
import type { PrismaPromise } from './PrismaPromise'
import type { InteractiveTransactionOptions, PrismaPromise, PrismaPromiseTransaction } from './PrismaPromise'

/**
* Creates a [[PrismaPromise]]. It is Prisma's implementation of `Promise` which
Expand All @@ -10,18 +10,18 @@ import type { PrismaPromise } from './PrismaPromise'
* @returns
*/
export function createPrismaPromise(
callback: (txId?: string | number, lock?: PromiseLike<void>) => PrismaPromise<unknown>,
callback: (transaction?: PrismaPromiseTransaction, lock?: PromiseLike<void>) => PrismaPromise<unknown>,
): PrismaPromise<unknown> {
let promise: PrismaPromise<unknown> | undefined
const _callback = (txId?: string | number, lock?: PromiseLike<void>, cached = true) => {
const _callback = (transaction?: PrismaPromiseTransaction, lock?: PromiseLike<void>, cached = true) => {
try {
// promises cannot be triggered twice after resolving
if (cached === true) {
return (promise ??= callback(txId, lock))
return (promise ??= callback(transaction, lock))
}

// but for for batch tx we need to trigger them again
return callback(txId, lock)
return callback(transaction, lock)
} catch (error) {
// if the callback throws, then we reject the promise
// and that is because exceptions are not always async
Expand All @@ -30,25 +30,34 @@ export function createPrismaPromise(
}

return {
then(onFulfilled, onRejected, txId?: string) {
return _callback(txId, undefined).then(onFulfilled, onRejected, txId)
then(onFulfilled, onRejected, transaction?) {
return _callback(createItx(transaction), undefined).then(onFulfilled, onRejected, transaction)
},
catch(onRejected, txId?: string) {
return _callback(txId, undefined).catch(onRejected, txId)
catch(onRejected, transaction?) {
return _callback(createItx(transaction), undefined).catch(onRejected, transaction)
},
finally(onFinally, txId?: string) {
return _callback(txId, undefined).finally(onFinally, txId)
finally(onFinally, transaction?) {
return _callback(createItx(transaction), undefined).finally(onFinally, transaction)
},
requestTransaction(txId: number, lock?: PromiseLike<void>) {
const promise = _callback(txId, lock, false)

requestTransaction(transactionOptions, lock?: PromiseLike<void>) {
const transaction = { kind: 'batch' as const, ...transactionOptions }
const promise = _callback(transaction, lock, false)

if (promise.requestTransaction) {
// we want to have support for nested promises
return promise.requestTransaction(txId, lock)
return promise.requestTransaction(transaction, lock)
}

return promise
},
[Symbol.toStringTag]: 'PrismaPromise',
}
}

function createItx(transaction: InteractiveTransactionOptions | undefined): PrismaPromiseTransaction | undefined {
if (transaction) {
return { kind: 'itx', ...transaction }
}
return undefined
}