Last active
April 26, 2025 18:58
-
-
Save hiramhuang/e2534a2a326c7929268fd06457edf096 to your computer and use it in GitHub Desktop.
Prisma Interactive Transaction Client for Effect TS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import type { Prisma } from '@prisma/client'; | |
| import type { Scope } from 'effect'; | |
| import { Cause, Context, Data, Duration, Effect, Exit, Option } from 'effect'; | |
| import { dual, pipe } from 'effect/Function'; | |
| const DEFAULT_TIMEOUT: Duration.DurationInput = '20 seconds'; | |
| const ROLLBACK_SYMBOL = Symbol('PrismaTransactionClient.ROLLBACK'); | |
| /** | |
| * Access the `Prisma.TransactionClient`. | |
| * | |
| * @example | |
| * ```ts | |
| * Effect.gen(function* () { | |
| * const tx = yield* PrismaTransactionClient.PrismaTransactionClient; | |
| * const account = yield* tryPrismaPromise(async () => tx.account.findFirst()); | |
| * return account; | |
| * }).pipe(PrismaTransactionClient.provide({ maxWait: '5 seconds', timeout: '10 seconds' })); | |
| * ``` | |
| */ | |
| class PrismaTransactionClient extends Context.Tag('PrismaTransactionClient')< | |
| PrismaTransactionClient, | |
| Prisma.TransactionClient | |
| >() {} | |
| /** | |
| * The error thrown when the `Scope` is not closed before the configured transaction timeout. | |
| */ | |
| class PrismaTransactionTimeoutError extends Data.TaggedError('PrismaTransactionTimeoutError') {} | |
| type Options = { | |
| /** | |
| * The maximum amount of time Prisma Client will wait to acquire a transaction from the database. | |
| * | |
| * @defaultValue Inherited from the Prisma Client. | |
| */ | |
| maxWait?: Duration.DurationInput; | |
| /** | |
| * The maximum amount of time the interactive transaction can run before being canceled and rolled back. | |
| * | |
| * @defaultValue {@link DEFAULT_TIMEOUT} | |
| */ | |
| timeout?: Duration.DurationInput; | |
| /** | |
| * The isolation level of the transaction. | |
| * | |
| * @defaultValue Inherited from the Prisma Client. | |
| */ | |
| isolationLevel?: Prisma.TransactionIsolationLevel; | |
| }; | |
| /** | |
| * Provide the `PrismaTransactionClient` service by creating a new Prisma transaction. | |
| * | |
| * @example | |
| * ```ts | |
| * Effect.gen(function* () { | |
| * const tx = yield* PrismaTransactionClient.PrismaTransactionClient; | |
| * const account = yield* tryPrismaPromise(async () => tx.account.findFirst()); | |
| * return account; | |
| * }).pipe(PrismaTransactionClient.provide({ maxWait: '5 seconds', timeout: '10 seconds' })); | |
| * ``` | |
| */ | |
| const provide = dual< | |
| ( | |
| options: Options, | |
| ) => <A, E, R>( | |
| self: Effect.Effect<A, E, R>, | |
| ) => Effect.Effect<A, E | PrismaTransactionTimeoutError, Scope.Scope | Exclude<R, PrismaTransactionClient>>, | |
| <A, E, R>( | |
| self: Effect.Effect<A, E, R>, | |
| options: Options, | |
| ) => Effect.Effect<A, E | PrismaTransactionTimeoutError, Scope.Scope | Exclude<R, PrismaTransactionClient>> | |
| >(2, (self, options) => { | |
| const timeout: Duration.Duration = Duration.decode(options.timeout ?? DEFAULT_TIMEOUT); | |
| let setTxClient: (txClient: Prisma.TransactionClient) => void; | |
| let setPrismaTransactionPromise: (prismaTxPromise: Promise<void>) => void; | |
| let resolveTxPromise: () => void; | |
| let rejectTxPromise: () => void; | |
| /** | |
| * Resolve this `Promise` to get the `Prisma.TransactionClient`. | |
| */ | |
| const getTxClient = new Promise<Prisma.TransactionClient>((resolve) => { | |
| setTxClient = (o) => resolve(o); | |
| }); | |
| /** | |
| * Resolve this `Promise` to get the `Promise` returned by `prisma.$transaction()`. | |
| */ | |
| const prismaTransactionPromise = new Promise<Promise<void>>((resolve) => { | |
| setPrismaTransactionPromise = (o) => resolve(o); | |
| }); | |
| /** | |
| * This `Promise` is for `prisma.$transaction()` to resolve. | |
| * | |
| * This Promise is used to control the transaction lifecycle: | |
| * - It resolves when {@link commit} is called | |
| * - It rejects with {@link ROLLBACK_SYMBOL} when {@link rollback} is called | |
| */ | |
| const txPromise = new Promise<void>((resolve, reject) => { | |
| resolveTxPromise = () => resolve(); | |
| rejectTxPromise = () => reject(ROLLBACK_SYMBOL); | |
| }); | |
| /** | |
| * Commit the transaction and resolve the `prisma.$transaction()` Promise. | |
| */ | |
| const commit = async () => { | |
| resolveTxPromise(); | |
| return prismaTransactionPromise; | |
| }; | |
| /** | |
| * Rollback the transaction and resolve the `prisma.$transaction()` Promise. | |
| */ | |
| const rollback = async () => { | |
| rejectTxPromise(); | |
| return prismaTransactionPromise; | |
| }; | |
| /** | |
| * Rollback the transaction without resolving the `prisma.$transaction()` Promise. | |
| */ | |
| const rollbackWithoutResolve = () => { | |
| rejectTxPromise(); | |
| }; | |
| return pipe( | |
| self, | |
| Effect.provideServiceEffect( | |
| PrismaTransactionClient, | |
| Effect.acquireRelease( | |
| // Acquire | |
| // Create a new Prisma transaction and return the `Prisma.TransactionClient`. | |
| Effect.promise(() => { | |
| setPrismaTransactionPromise( | |
| prisma.$transaction( | |
| (tx) => { | |
| setTxClient(tx); | |
| return txPromise; | |
| }, | |
| { | |
| ...(options.maxWait && { maxWait: Duration.toMillis(options.maxWait) }), | |
| ...(!!options.isolationLevel && { isolationLevel: options.isolationLevel }), | |
| // Although we are manually controlling the transaction timeout by `Effect.timeoutFail`, | |
| // we still need to pass the timeout config to Prisma. | |
| // This allows Prisma to drop pending queries after the transaction timeout. | |
| // Prisma will not abort ongoing queries immediately, it just prevents next queries from being executed. | |
| // Add some buffer time to the timeout to avoid the Effect die from the Prisma error. | |
| timeout: Duration.toMillis(timeout) + 500, | |
| }, | |
| ), | |
| ); | |
| return getTxClient; | |
| }), | |
| // Finalizer | |
| // Commit or rollback the transaction based on the exit status. | |
| (_tx, exit) => { | |
| if (Exit.isFailure(exit)) { | |
| const error = Option.getOrUndefined(Cause.failureOption(exit.cause)); | |
| const isTimeout = error instanceof PrismaTransactionTimeoutError; | |
| return isTimeout | |
| ? // If the exit is caused by the transaction timeout, don't wait for promise to resolve. | |
| // Because the Prisma transaction won't abort ongoing queries immediately on timeout, | |
| // so if we wait here, it will cause the timeout not work as expected. | |
| // We have also pass the timeout config to Prisma, so it's fine to not wait for the promise to resolve. | |
| Effect.sync(() => rollbackWithoutResolve()) | |
| : Effect.promise(() => rollback()); | |
| } | |
| return Effect.promise(() => commit()); | |
| }, | |
| ), | |
| ), | |
| // We use this to control the transaction timeout manually. | |
| // This is because the Prisma transaction timeout won't reject the promise exactly on the timeout. | |
| // Our approach will terminate the Effect Fiber exactly on the timeout, and also reject the Prisma transaction promise. | |
| Effect.timeoutFail({ | |
| duration: timeout, | |
| // This error will be caught by the `Effect.acquireRelease` and rolled back if not caught by the caller. | |
| onTimeout: () => new PrismaTransactionTimeoutError(), | |
| }), | |
| ); | |
| }); | |
| export { PrismaTransactionClient, PrismaTransactionTimeoutError, provide }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| export * as PrismaTransactionClient from './client'; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
References: