import type { Prisma } from '@prisma/client'; import type { Scope } from 'effect'; import { Cause, Context, Data, Duration, Effect, Exit, Option } from 'effect'; import { dual, pipe } from 'effect/Function'; const DEFAULT_TIMEOUT: Duration.DurationInput = '20 seconds'; const ROLLBACK_SYMBOL = Symbol('PrismaTransactionClient.ROLLBACK'); /** * Access the `Prisma.TransactionClient`. * * @example * ```ts * Effect.gen(function* () { * const tx = yield* PrismaTransactionClient.PrismaTransactionClient; * const account = yield* tryPrismaPromise(async () => tx.account.findFirst()); * return account; * }).pipe(PrismaTransactionClient.provide({ maxWait: '5 seconds', timeout: '10 seconds' })); * ``` */ class PrismaTransactionClient extends Context.Tag('PrismaTransactionClient')< PrismaTransactionClient, Prisma.TransactionClient >() {} /** * The error thrown when the `Scope` is not closed before the configured transaction timeout. */ class PrismaTransactionTimeoutError extends Data.TaggedError('PrismaTransactionTimeoutError') {} type Options = { /** * The maximum amount of time Prisma Client will wait to acquire a transaction from the database. * * @defaultValue Inherited from the Prisma Client. */ maxWait?: Duration.DurationInput; /** * The maximum amount of time the interactive transaction can run before being canceled and rolled back. * * @defaultValue {@link DEFAULT_TIMEOUT} */ timeout?: Duration.DurationInput; /** * The isolation level of the transaction. * * @defaultValue Inherited from the Prisma Client. */ isolationLevel?: Prisma.TransactionIsolationLevel; }; /** * Provide the `PrismaTransactionClient` service by creating a new Prisma transaction. * * @example * ```ts * Effect.gen(function* () { * const tx = yield* PrismaTransactionClient.PrismaTransactionClient; * const account = yield* tryPrismaPromise(async () => tx.account.findFirst()); * return account; * }).pipe(PrismaTransactionClient.provide({ maxWait: '5 seconds', timeout: '10 seconds' })); * ``` */ const provide = dual< ( options: Options, ) => ( self: Effect.Effect, ) => Effect.Effect>, ( self: Effect.Effect, options: Options, ) => Effect.Effect> >(2, (self, options) => { const timeout: Duration.Duration = Duration.decode(options.timeout ?? DEFAULT_TIMEOUT); let setTxClient: (txClient: Prisma.TransactionClient) => void; let setPrismaTransactionPromise: (prismaTxPromise: Promise) => void; let resolveTxPromise: () => void; let rejectTxPromise: () => void; /** * Resolve this `Promise` to get the `Prisma.TransactionClient`. */ const getTxClient = new Promise((resolve) => { setTxClient = (o) => resolve(o); }); /** * Resolve this `Promise` to get the `Promise` returned by `prisma.$transaction()`. */ const prismaTransactionPromise = new Promise>((resolve) => { setPrismaTransactionPromise = (o) => resolve(o); }); /** * This `Promise` is for `prisma.$transaction()` to resolve. * * This Promise is used to control the transaction lifecycle: * - It resolves when {@link commit} is called * - It rejects with {@link ROLLBACK_SYMBOL} when {@link rollback} is called */ const txPromise = new Promise((resolve, reject) => { resolveTxPromise = () => resolve(); rejectTxPromise = () => reject(ROLLBACK_SYMBOL); }); /** * Commit the transaction and resolve the `prisma.$transaction()` Promise. */ const commit = async () => { resolveTxPromise(); return prismaTransactionPromise; }; /** * Rollback the transaction and resolve the `prisma.$transaction()` Promise. */ const rollback = async () => { rejectTxPromise(); return prismaTransactionPromise; }; /** * Rollback the transaction without resolving the `prisma.$transaction()` Promise. */ const rollbackWithoutResolve = () => { rejectTxPromise(); }; return pipe( self, Effect.provideServiceEffect( PrismaTransactionClient, Effect.acquireRelease( // Acquire // Create a new Prisma transaction and return the `Prisma.TransactionClient`. Effect.promise(() => { setPrismaTransactionPromise( prisma.$transaction( (tx) => { setTxClient(tx); return txPromise; }, { ...(options.maxWait && { maxWait: Duration.toMillis(options.maxWait) }), ...(!!options.isolationLevel && { isolationLevel: options.isolationLevel }), // Although we are manually controlling the transaction timeout by `Effect.timeoutFail`, // we still need to pass the timeout config to Prisma. // This allows Prisma to drop pending queries after the transaction timeout. // Prisma will not abort ongoing queries immediately, it just prevents next queries from being executed. // Add some buffer time to the timeout to avoid the Effect die from the Prisma error. timeout: Duration.toMillis(timeout) + 500, }, ), ); return getTxClient; }), // Finalizer // Commit or rollback the transaction based on the exit status. (_tx, exit) => { if (Exit.isFailure(exit)) { const error = Option.getOrUndefined(Cause.failureOption(exit.cause)); const isTimeout = error instanceof PrismaTransactionTimeoutError; return isTimeout ? // If the exit is caused by the transaction timeout, don't wait for promise to resolve. // Because the Prisma transaction won't abort ongoing queries immediately on timeout, // so if we wait here, it will cause the timeout not work as expected. // We have also pass the timeout config to Prisma, so it's fine to not wait for the promise to resolve. Effect.sync(() => rollbackWithoutResolve()) : Effect.promise(() => rollback()); } return Effect.promise(() => commit()); }, ), ), // We use this to control the transaction timeout manually. // This is because the Prisma transaction timeout won't reject the promise exactly on the timeout. // Our approach will terminate the Effect Fiber exactly on the timeout, and also reject the Prisma transaction promise. Effect.timeoutFail({ duration: timeout, // This error will be caught by the `Effect.acquireRelease` and rolled back if not caught by the caller. onTimeout: () => new PrismaTransactionTimeoutError(), }), ); }); export { PrismaTransactionClient, PrismaTransactionTimeoutError, provide };