Last active
December 11, 2024 01:06
-
-
Save tim-smart/bc32caa3d0a817c7418d58421c2a7d5d to your computer and use it in GitHub Desktop.
Revisions
-
tim-smart revised this gist
Dec 11, 2024 . 1 changed file with 36 additions and 20 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,8 +2,8 @@ import { Data, Effect, Layer, pipe, PubSub, Schedule, Scope, Stream, @@ -16,6 +16,7 @@ import type { } from "@atcute/client/lexicons" import "@atcute/bluesky/lexicons" import { NodeRuntime, NodeSocket } from "@effect/platform-node" import { Jetstream as SWJetstream } from "@skyware/jetstream" export class JetstreamError extends Data.TaggedError("JetstreamError")<{ message: string @@ -27,14 +28,12 @@ class Jetstream extends Effect.Service<Jetstream>()("Jetstream", { // access the WebSocket constructor for the current platform const ws = yield* Socket.WebSocketConstructor const subscribe = < WantedCollections extends CollectionOrWildcard = CollectionOrWildcard, >(options: { readonly endpoint?: string readonly wantedCollections?: Array<WantedCollections> readonly startCursor?: number }) => Effect.gen(function* () { const scope = yield* Effect.scope @@ -48,15 +47,12 @@ class Jetstream extends Effect.Service<Jetstream>()("Jetstream", { }) // create a mailbox to receive events const pubsub = yield* PubSub.unbounded< | CommitEvent<ResolveLexiconWildcard<WantedCollections>> | AccountEvent | IdentityEvent >() yield* Scope.addFinalizer(scope, pubsub.shutdown) // track the cursor for the last event received let cursor = options.startCursor ?? 0 @@ -98,17 +94,17 @@ class Jetstream extends Effect.Service<Jetstream>()("Jetstream", { return } pubsub.unsafeOffer(event) return } case EventType.Account: { if (!event.account?.did) return pubsub.unsafeOffer(event) return } case EventType.Identity: { if (!event.identity?.did) return pubsub.unsafeOffer(event) return } } @@ -128,11 +124,7 @@ class Jetstream extends Effect.Service<Jetstream>()("Jetstream", { Effect.forkIn(scope), ) return pubsub }) const stream = < @@ -142,9 +134,12 @@ class Jetstream extends Effect.Service<Jetstream>()("Jetstream", { readonly wantedCollections?: Array<WantedCollections> readonly startCursor?: number }) => subscribe(options).pipe( Effect.map((pubsub) => Stream.fromPubSub(pubsub)), Stream.unwrapScoped, ) return { subscribe, stream } as const }), }) {} @@ -337,3 +332,24 @@ export type Commit<RecordType extends string> = | CommitCreate<RecordType> | CommitUpdate<RecordType> | CommitDelete<RecordType> Stream.asyncPush<any, Error>((emit) => Effect.gen(function* () { const jetstream = yield* Effect.acquireRelease( Effect.sync( () => new SWJetstream({ wantedCollections: ["app.bsky.feed.post"], }), ), (js) => Effect.sync(() => js.close()), ) jetstream.start() jetstream.onCreate("app.bsky.feed.post", (record) => { emit.single(record) }) jetstream.on("error", (error) => { emit.fail(error) }) }), ).pipe(Stream.retry(Schedule.exponential("1 second"))) -
tim-smart created this gist
Dec 11, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,339 @@ import { Data, Effect, Layer, Mailbox, pipe, Schedule, Scope, Stream, } from "effect" import { Socket } from "@effect/platform" import type { At, ComAtprotoSyncSubscribeRepos, Records, } from "@atcute/client/lexicons" import "@atcute/bluesky/lexicons" import { NodeRuntime, NodeSocket } from "@effect/platform-node" export class JetstreamError extends Data.TaggedError("JetstreamError")<{ message: string cause: unknown }> {} class Jetstream extends Effect.Service<Jetstream>()("Jetstream", { effect: Effect.gen(function* () { // access the WebSocket constructor for the current platform const ws = yield* Socket.WebSocketConstructor const mailbox = < WantedCollections extends CollectionOrWildcard = CollectionOrWildcard, >(options: { readonly endpoint?: string readonly wantedCollections?: Array<WantedCollections> readonly startCursor?: number readonly bufferSize?: number readonly strategy?: "sliding" | "dropping" }) => Effect.gen(function* () { const scope = yield* Effect.scope // create a URL for the Jetstream endpoint const url = new URL( options.endpoint ?? "wss://jetstream1.us-east.bsky.network/subscribe", ) options.wantedCollections?.forEach((collection) => { url.searchParams.append("wantedCollections", collection) }) // create a mailbox to receive events const mailbox = yield* Mailbox.make< | CommitEvent<ResolveLexiconWildcard<WantedCollections>> | AccountEvent | IdentityEvent >({ capacity: options.bufferSize, strategy: options.strategy, }) yield* Scope.addFinalizer(scope, mailbox.shutdown) // track the cursor for the last event received let cursor = options.startCursor ?? 0 // create a WebSocket connection to the Jetstream endpoint // It effectfully constructs the URL based on the current cursor const socket = yield* Socket.makeWebSocket( Effect.sync(() => { if (cursor > 0) { url.searchParams.set("cursor", cursor.toString()) } return url.toString() }), ).pipe(Effect.provideService(Socket.WebSocketConstructor, ws)) yield* pipe( socket.runRaw((message) => { try { const event = JSON.parse(message as string) as | CommitEvent<ResolveLexiconWildcard<WantedCollections>> | AccountEvent | IdentityEvent if (event.time_us < cursor) return cursor = event.time_us switch (event.kind) { case EventType.Commit: { if ( !event.commit?.collection || !event.commit.rkey || !event.commit.rev ) { return } if ( event.commit.operation === CommitType.Create && !event.commit.record ) { return } mailbox.unsafeOffer(event) return } case EventType.Account: { if (!event.account?.did) return mailbox.unsafeOffer(event) return } case EventType.Identity: { if (!event.identity?.did) return mailbox.unsafeOffer(event) return } } } catch (error) { return new JetstreamError({ message: "Failed to parse event", cause: error, }) } }), Effect.tapErrorCause(Effect.logWarning), Effect.retry( Schedule.exponential("1 second").pipe( Schedule.union(Schedule.spaced("10 second")), ), ), Effect.forkIn(scope), ) return mailbox as Mailbox.ReadonlyMailbox< | CommitEvent<ResolveLexiconWildcard<WantedCollections>> | AccountEvent | IdentityEvent > }) const stream = < WantedCollections extends CollectionOrWildcard = CollectionOrWildcard, >(options: { readonly endpoint?: string readonly wantedCollections?: Array<WantedCollections> readonly startCursor?: number }) => mailbox(options).pipe(Effect.map(Mailbox.toStream), Stream.unwrapScoped) return { mailbox, stream } as const }), }) {} // ---------------------------------------------------------------------------- Effect.gen(function* () { const jetstream = yield* Jetstream yield* jetstream .stream({ wantedCollections: ["app.bsky.feed.post"] }) .pipe(Stream.runForEach(Effect.log)) }).pipe( Effect.provide( Jetstream.Default.pipe(Layer.provide(NodeSocket.layerWebSocketConstructor)), ), NodeRuntime.runMain, ) // ---------------------------------------------------------------------------- /** Resolves a lexicon name to its record operation. */ export type ResolveLexicon<T extends string> = T extends keyof Records ? Records[T] : { $type: T } /** Checks if any member of a union is assignable to a given operation. */ type UnionMemberIsAssignableTo<Union, AssignableTo> = // Distribute over union members Union extends Union ? // `Union` here refers to a given union member Union extends AssignableTo ? true : never : never /** Resolves a wildcard string to the record types it matches. */ export type ResolveLexiconWildcard<T extends string> = // Match the prefix T extends `${infer Prefix}*` ? // Check that at least one collection name matches the prefix (we use `true extends` because `never` extends everything) true extends UnionMemberIsAssignableTo< keyof Records, `${Prefix}${string}` > ? // If so, return known matching collection names keyof Records & `${Prefix}${string}` extends infer Lexicon extends string ? Lexicon : never : // If no collection name matches the prefix, return as a operation-level wildcard string `${Prefix}${string}` : // If there's no wildcard, return the original string T /** The name of a collection. */ export type Collection = keyof Records | (string & {}) /** Generates all possible wildcard strings that match a given collection name. */ type PossibleCollectionWildcards<CollectionName extends string> = CollectionName extends `${infer Prefix}.${infer Suffix}` ? `${Prefix}.*` | `${Prefix}.${PossibleCollectionWildcards<Suffix>}` : never /** The name of a collection or a wildcard string matching multiple collections. */ export type CollectionOrWildcard = | PossibleCollectionWildcards<keyof Records> | Collection /** * The types of events that are emitted by {@link Jetstream}. * @enum */ export const EventType = { /** A new commit. */ Commit: "commit", /** An account's status was updated. */ Account: "account", /** An account's identity was updated. */ Identity: "identity", } as const export type EventType = (typeof EventType)[keyof typeof EventType] /** * The types of commits that can be received. * @enum */ export const CommitType = { /** A record was created. */ Create: "create", /** A record was updated. */ Update: "update", /** A record was deleted. */ Delete: "delete", } as const export type CommitType = (typeof CommitType)[keyof typeof CommitType] /** * The base operation for events emitted by the {@link Jetstream} class. */ export interface EventBase { did: At.DID time_us: number kind: EventType } /** * A commit event. Represents a commit to a user repository. */ export interface CommitEvent<RecordType extends string> extends EventBase { kind: typeof EventType.Commit commit: Commit<RecordType> } /** A commit event where a record was created. */ export interface CommitCreateEvent<RecordType extends string> extends CommitEvent<RecordType> { commit: CommitCreate<RecordType> } /** A commit event where a record was updated. */ export interface CommitUpdateEvent<RecordType extends string> extends CommitEvent<RecordType> { commit: CommitUpdate<RecordType> } /** A commit event where a record was deleted. */ export interface CommitDeleteEvent<RecordType extends string> extends CommitEvent<RecordType> { commit: CommitDelete<RecordType> } /** * An account event. Represents a change to an account's status on a host (e.g. PDS or Relay). */ export interface AccountEvent extends EventBase { kind: typeof EventType.Account account: ComAtprotoSyncSubscribeRepos.Account } /** * An identity event. Represents a change to an account's identity. */ export interface IdentityEvent extends EventBase { kind: typeof EventType.Identity identity: ComAtprotoSyncSubscribeRepos.Identity } /** * The base operation for commit events. */ export interface CommitBase<RecordType extends string> { operation: CommitType rev: string collection: RecordType rkey: string } /** * A commit event representing a new record. */ export interface CommitCreate<RecordType extends string> extends CommitBase<RecordType> { operation: typeof CommitType.Create record: ResolveLexicon<RecordType> cid: At.CID } /** * A commit event representing an update to an existing record. */ export interface CommitUpdate<RecordType extends string> extends CommitBase<RecordType> { operation: typeof CommitType.Update record: ResolveLexicon<RecordType> cid: At.CID } /** * A commit event representing a deletion of an existing record. */ export interface CommitDelete<RecordType extends string> extends CommitBase<RecordType> { operation: typeof CommitType.Delete } /** * A commit event. */ export type Commit<RecordType extends string> = | CommitCreate<RecordType> | CommitUpdate<RecordType> | CommitDelete<RecordType>