Skip to content

Instantly share code, notes, and snippets.

@ryanleecode
Created April 28, 2024 05:02
Show Gist options
  • Select an option

  • Save ryanleecode/32954a7ec1a15bfb1bfb1b87288839fc to your computer and use it in GitHub Desktop.

Select an option

Save ryanleecode/32954a7ec1a15bfb1bfb1b87288839fc to your computer and use it in GitHub Desktop.

Revisions

  1. ryanleecode created this gist Apr 28, 2024.
    52 changes: 52 additions & 0 deletions trigger_effect_helpers.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,52 @@
    import type { IO, IOTask, Json, RunTaskOptions } from '@trigger.dev/sdk';
    import { Cause, Deferred, Effect, Exit } from 'effect';

    export const runTask =
    <T extends Json<T>, E = never>(
    cacheKey: string,
    effect: (task: IOTask, io: IO) => Effect.Effect<T, E, never>,
    options?: RunTaskOptions & {
    parseOutput?: (output: unknown) => T;
    },
    ) =>
    (io: IO) =>
    Effect.gen(function* (_) {
    const tDeferred = yield* _(Deferred.make<T, E>());

    yield* _(
    Effect.fork(
    Effect.async<T, E, never>((register) => {
    io.runTask(
    cacheKey,
    async (t, io) => {
    const { promise, resolve, reject } = Promise.withResolvers<T>();
    register(
    effect(t, io).pipe(
    Effect.onExit(
    Exit.matchEffect({
    onFailure: (cause) =>
    Effect.all([
    Deferred.failCause(tDeferred, cause),
    Effect.sync(() =>
    reject(Cause.originalError(cause)),
    ),
    ]),
    onSuccess: (t) =>
    Effect.all([
    Deferred.succeed(tDeferred, t),
    Effect.sync(() => resolve(t)),
    ]),
    }),
    ),
    ),
    );
    return promise;
    },
    options,
    ).catch((_) => {});
    }),
    ),
    );

    return yield* _(Deferred.await(tDeferred));
    });