Skip to content

Instantly share code, notes, and snippets.

@tim-smart
Last active December 11, 2024 01:06
Show Gist options
  • Save tim-smart/bc32caa3d0a817c7418d58421c2a7d5d to your computer and use it in GitHub Desktop.
Save tim-smart/bc32caa3d0a817c7418d58421c2a7d5d to your computer and use it in GitHub Desktop.

Revisions

  1. tim-smart revised this gist Dec 11, 2024. 1 changed file with 36 additions and 20 deletions.
    56 changes: 36 additions & 20 deletions jetstream.ts
    Original file line number Diff line number Diff line change
    @@ -2,8 +2,8 @@ import {
    Data,
    Effect,
    Layer,
    Mailbox,
    pipe,
    PubSub,
    Schedule,
    Scope,
    Stream,
    @@ -16,6 +16,7 @@ import type {
    } from "@atcute/client/lexicons"
    import "@atcute/bluesky/lexicons"
    import { NodeRuntime, NodeSocket } from "@effect/platform-node"
    import { Jetstream as SWJetstream } from "@skyware/jetstream"

    export class JetstreamError extends Data.TaggedError("JetstreamError")<{
    message: string
    @@ -27,14 +28,12 @@ class Jetstream extends Effect.Service<Jetstream>()("Jetstream", {
    // access the WebSocket constructor for the current platform
    const ws = yield* Socket.WebSocketConstructor

    const mailbox = <
    const subscribe = <
    WantedCollections extends CollectionOrWildcard = CollectionOrWildcard,
    >(options: {
    readonly endpoint?: string
    readonly wantedCollections?: Array<WantedCollections>
    readonly startCursor?: number
    readonly bufferSize?: number
    readonly strategy?: "sliding" | "dropping"
    }) =>
    Effect.gen(function* () {
    const scope = yield* Effect.scope
    @@ -48,15 +47,12 @@ class Jetstream extends Effect.Service<Jetstream>()("Jetstream", {
    })

    // create a mailbox to receive events
    const mailbox = yield* Mailbox.make<
    const pubsub = yield* PubSub.unbounded<
    | CommitEvent<ResolveLexiconWildcard<WantedCollections>>
    | AccountEvent
    | IdentityEvent
    >({
    capacity: options.bufferSize,
    strategy: options.strategy,
    })
    yield* Scope.addFinalizer(scope, mailbox.shutdown)
    >()
    yield* Scope.addFinalizer(scope, pubsub.shutdown)

    // track the cursor for the last event received
    let cursor = options.startCursor ?? 0
    @@ -98,17 +94,17 @@ class Jetstream extends Effect.Service<Jetstream>()("Jetstream", {
    return
    }

    mailbox.unsafeOffer(event)
    pubsub.unsafeOffer(event)
    return
    }
    case EventType.Account: {
    if (!event.account?.did) return
    mailbox.unsafeOffer(event)
    pubsub.unsafeOffer(event)
    return
    }
    case EventType.Identity: {
    if (!event.identity?.did) return
    mailbox.unsafeOffer(event)
    pubsub.unsafeOffer(event)
    return
    }
    }
    @@ -128,11 +124,7 @@ class Jetstream extends Effect.Service<Jetstream>()("Jetstream", {
    Effect.forkIn(scope),
    )

    return mailbox as Mailbox.ReadonlyMailbox<
    | CommitEvent<ResolveLexiconWildcard<WantedCollections>>
    | AccountEvent
    | IdentityEvent
    >
    return pubsub
    })

    const stream = <
    @@ -142,9 +134,12 @@ class Jetstream extends Effect.Service<Jetstream>()("Jetstream", {
    readonly wantedCollections?: Array<WantedCollections>
    readonly startCursor?: number
    }) =>
    mailbox(options).pipe(Effect.map(Mailbox.toStream), Stream.unwrapScoped)
    subscribe(options).pipe(
    Effect.map((pubsub) => Stream.fromPubSub(pubsub)),
    Stream.unwrapScoped,
    )

    return { mailbox, stream } as const
    return { subscribe, stream } as const
    }),
    }) {}

    @@ -337,3 +332,24 @@ export type Commit<RecordType extends string> =
    | CommitCreate<RecordType>
    | CommitUpdate<RecordType>
    | CommitDelete<RecordType>

    Stream.asyncPush<any, Error>((emit) =>
    Effect.gen(function* () {
    const jetstream = yield* Effect.acquireRelease(
    Effect.sync(
    () =>
    new SWJetstream({
    wantedCollections: ["app.bsky.feed.post"],
    }),
    ),
    (js) => Effect.sync(() => js.close()),
    )
    jetstream.start()
    jetstream.onCreate("app.bsky.feed.post", (record) => {
    emit.single(record)
    })
    jetstream.on("error", (error) => {
    emit.fail(error)
    })
    }),
    ).pipe(Stream.retry(Schedule.exponential("1 second")))
  2. tim-smart created this gist Dec 11, 2024.
    339 changes: 339 additions & 0 deletions jetstream.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,339 @@
    import {
    Data,
    Effect,
    Layer,
    Mailbox,
    pipe,
    Schedule,
    Scope,
    Stream,
    } from "effect"
    import { Socket } from "@effect/platform"
    import type {
    At,
    ComAtprotoSyncSubscribeRepos,
    Records,
    } from "@atcute/client/lexicons"
    import "@atcute/bluesky/lexicons"
    import { NodeRuntime, NodeSocket } from "@effect/platform-node"

    export class JetstreamError extends Data.TaggedError("JetstreamError")<{
    message: string
    cause: unknown
    }> {}

    class Jetstream extends Effect.Service<Jetstream>()("Jetstream", {
    effect: Effect.gen(function* () {
    // access the WebSocket constructor for the current platform
    const ws = yield* Socket.WebSocketConstructor

    const mailbox = <
    WantedCollections extends CollectionOrWildcard = CollectionOrWildcard,
    >(options: {
    readonly endpoint?: string
    readonly wantedCollections?: Array<WantedCollections>
    readonly startCursor?: number
    readonly bufferSize?: number
    readonly strategy?: "sliding" | "dropping"
    }) =>
    Effect.gen(function* () {
    const scope = yield* Effect.scope

    // create a URL for the Jetstream endpoint
    const url = new URL(
    options.endpoint ?? "wss://jetstream1.us-east.bsky.network/subscribe",
    )
    options.wantedCollections?.forEach((collection) => {
    url.searchParams.append("wantedCollections", collection)
    })

    // create a mailbox to receive events
    const mailbox = yield* Mailbox.make<
    | CommitEvent<ResolveLexiconWildcard<WantedCollections>>
    | AccountEvent
    | IdentityEvent
    >({
    capacity: options.bufferSize,
    strategy: options.strategy,
    })
    yield* Scope.addFinalizer(scope, mailbox.shutdown)

    // track the cursor for the last event received
    let cursor = options.startCursor ?? 0

    // create a WebSocket connection to the Jetstream endpoint
    // It effectfully constructs the URL based on the current cursor
    const socket = yield* Socket.makeWebSocket(
    Effect.sync(() => {
    if (cursor > 0) {
    url.searchParams.set("cursor", cursor.toString())
    }
    return url.toString()
    }),
    ).pipe(Effect.provideService(Socket.WebSocketConstructor, ws))

    yield* pipe(
    socket.runRaw((message) => {
    try {
    const event = JSON.parse(message as string) as
    | CommitEvent<ResolveLexiconWildcard<WantedCollections>>
    | AccountEvent
    | IdentityEvent

    if (event.time_us < cursor) return
    cursor = event.time_us
    switch (event.kind) {
    case EventType.Commit: {
    if (
    !event.commit?.collection ||
    !event.commit.rkey ||
    !event.commit.rev
    ) {
    return
    }
    if (
    event.commit.operation === CommitType.Create &&
    !event.commit.record
    ) {
    return
    }

    mailbox.unsafeOffer(event)
    return
    }
    case EventType.Account: {
    if (!event.account?.did) return
    mailbox.unsafeOffer(event)
    return
    }
    case EventType.Identity: {
    if (!event.identity?.did) return
    mailbox.unsafeOffer(event)
    return
    }
    }
    } catch (error) {
    return new JetstreamError({
    message: "Failed to parse event",
    cause: error,
    })
    }
    }),
    Effect.tapErrorCause(Effect.logWarning),
    Effect.retry(
    Schedule.exponential("1 second").pipe(
    Schedule.union(Schedule.spaced("10 second")),
    ),
    ),
    Effect.forkIn(scope),
    )

    return mailbox as Mailbox.ReadonlyMailbox<
    | CommitEvent<ResolveLexiconWildcard<WantedCollections>>
    | AccountEvent
    | IdentityEvent
    >
    })

    const stream = <
    WantedCollections extends CollectionOrWildcard = CollectionOrWildcard,
    >(options: {
    readonly endpoint?: string
    readonly wantedCollections?: Array<WantedCollections>
    readonly startCursor?: number
    }) =>
    mailbox(options).pipe(Effect.map(Mailbox.toStream), Stream.unwrapScoped)

    return { mailbox, stream } as const
    }),
    }) {}

    // ----------------------------------------------------------------------------

    Effect.gen(function* () {
    const jetstream = yield* Jetstream

    yield* jetstream
    .stream({ wantedCollections: ["app.bsky.feed.post"] })
    .pipe(Stream.runForEach(Effect.log))
    }).pipe(
    Effect.provide(
    Jetstream.Default.pipe(Layer.provide(NodeSocket.layerWebSocketConstructor)),
    ),
    NodeRuntime.runMain,
    )

    // ----------------------------------------------------------------------------

    /** Resolves a lexicon name to its record operation. */
    export type ResolveLexicon<T extends string> = T extends keyof Records
    ? Records[T]
    : { $type: T }

    /** Checks if any member of a union is assignable to a given operation. */
    type UnionMemberIsAssignableTo<Union, AssignableTo> =
    // Distribute over union members
    Union extends Union
    ? // `Union` here refers to a given union member
    Union extends AssignableTo
    ? true
    : never
    : never

    /** Resolves a wildcard string to the record types it matches. */
    export type ResolveLexiconWildcard<T extends string> =
    // Match the prefix
    T extends `${infer Prefix}*`
    ? // Check that at least one collection name matches the prefix (we use `true extends` because `never` extends everything)
    true extends UnionMemberIsAssignableTo<
    keyof Records,
    `${Prefix}${string}`
    >
    ? // If so, return known matching collection names
    keyof Records & `${Prefix}${string}` extends infer Lexicon extends
    string
    ? Lexicon
    : never
    : // If no collection name matches the prefix, return as a operation-level wildcard string
    `${Prefix}${string}`
    : // If there's no wildcard, return the original string
    T

    /** The name of a collection. */
    export type Collection = keyof Records | (string & {})

    /** Generates all possible wildcard strings that match a given collection name. */
    type PossibleCollectionWildcards<CollectionName extends string> =
    CollectionName extends `${infer Prefix}.${infer Suffix}`
    ? `${Prefix}.*` | `${Prefix}.${PossibleCollectionWildcards<Suffix>}`
    : never

    /** The name of a collection or a wildcard string matching multiple collections. */
    export type CollectionOrWildcard =
    | PossibleCollectionWildcards<keyof Records>
    | Collection

    /**
    * The types of events that are emitted by {@link Jetstream}.
    * @enum
    */
    export const EventType = {
    /** A new commit. */
    Commit: "commit",
    /** An account's status was updated. */
    Account: "account",
    /** An account's identity was updated. */
    Identity: "identity",
    } as const
    export type EventType = (typeof EventType)[keyof typeof EventType]

    /**
    * The types of commits that can be received.
    * @enum
    */
    export const CommitType = {
    /** A record was created. */
    Create: "create",
    /** A record was updated. */
    Update: "update",
    /** A record was deleted. */
    Delete: "delete",
    } as const
    export type CommitType = (typeof CommitType)[keyof typeof CommitType]

    /**
    * The base operation for events emitted by the {@link Jetstream} class.
    */
    export interface EventBase {
    did: At.DID
    time_us: number
    kind: EventType
    }

    /**
    * A commit event. Represents a commit to a user repository.
    */
    export interface CommitEvent<RecordType extends string> extends EventBase {
    kind: typeof EventType.Commit
    commit: Commit<RecordType>
    }

    /** A commit event where a record was created. */
    export interface CommitCreateEvent<RecordType extends string>
    extends CommitEvent<RecordType> {
    commit: CommitCreate<RecordType>
    }

    /** A commit event where a record was updated. */
    export interface CommitUpdateEvent<RecordType extends string>
    extends CommitEvent<RecordType> {
    commit: CommitUpdate<RecordType>
    }

    /** A commit event where a record was deleted. */
    export interface CommitDeleteEvent<RecordType extends string>
    extends CommitEvent<RecordType> {
    commit: CommitDelete<RecordType>
    }

    /**
    * An account event. Represents a change to an account's status on a host (e.g. PDS or Relay).
    */
    export interface AccountEvent extends EventBase {
    kind: typeof EventType.Account
    account: ComAtprotoSyncSubscribeRepos.Account
    }

    /**
    * An identity event. Represents a change to an account's identity.
    */
    export interface IdentityEvent extends EventBase {
    kind: typeof EventType.Identity
    identity: ComAtprotoSyncSubscribeRepos.Identity
    }

    /**
    * The base operation for commit events.
    */
    export interface CommitBase<RecordType extends string> {
    operation: CommitType
    rev: string
    collection: RecordType
    rkey: string
    }

    /**
    * A commit event representing a new record.
    */
    export interface CommitCreate<RecordType extends string>
    extends CommitBase<RecordType> {
    operation: typeof CommitType.Create
    record: ResolveLexicon<RecordType>
    cid: At.CID
    }

    /**
    * A commit event representing an update to an existing record.
    */
    export interface CommitUpdate<RecordType extends string>
    extends CommitBase<RecordType> {
    operation: typeof CommitType.Update
    record: ResolveLexicon<RecordType>
    cid: At.CID
    }

    /**
    * A commit event representing a deletion of an existing record.
    */
    export interface CommitDelete<RecordType extends string>
    extends CommitBase<RecordType> {
    operation: typeof CommitType.Delete
    }

    /**
    * A commit event.
    */
    export type Commit<RecordType extends string> =
    | CommitCreate<RecordType>
    | CommitUpdate<RecordType>
    | CommitDelete<RecordType>