Last active
October 3, 2025 03:56
-
-
Save jacob-ebey/573756684a0d09e3816e587e51a32b92 to your computer and use it in GitHub Desktop.
Cloudflare ATProto DO PDS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { DurableObject } from "cloudflare:workers"; | |
| import type { Lexicons } from "@atproto/lexicon"; | |
| import { | |
| XrpcClient as BaseXrpcClient, | |
| FetchHandler, | |
| FetchHandlerOptions, | |
| } from "@atproto/xrpc"; | |
| import { | |
| createTemporaryReferenceSet, | |
| decodeAction, | |
| decodeFormState, | |
| decodeReply, | |
| loadServerAction, | |
| renderToReadableStream, | |
| } from "@vitejs/plugin-rsc/rsc"; | |
| import { | |
| unstable_matchRSCServerRequest as matchRSCServerRequest, | |
| type unstable_RSCRouteConfig as RSCRouteConfig, | |
| } from "react-router"; | |
| import { provideAtProtoContext } from "./atproto"; | |
| import { provideRequestContext } from "./request"; | |
| import { provideSessionContext } from "./session"; | |
| import { ERROR_BOUNDARY_ERROR, ERROR_DIGEST_BASE } from "./shared"; | |
| import { | |
| deserializeResponse, | |
| serializeRequestWithoutBody, | |
| serializeResponse, | |
| } from "./transport-tools"; | |
| export { getAtprotoClient } from "./atproto"; | |
| export { | |
| AtprotoOAuthClient, | |
| resolveDidDocument, | |
| resolveDidFromHandle, | |
| } from "./atproto-oauth-client"; | |
| export { getRequest } from "./request"; | |
| export { destroySession, getSession } from "./session"; | |
| declare global { | |
| namespace ProtoflareServer { | |
| export interface XrpcClient extends BaseXrpcClient {} | |
| } | |
| } | |
| export class BoundaryError extends Error { | |
| public digest: `${typeof ERROR_DIGEST_BASE}${string}`; | |
| constructor({ | |
| status, | |
| statusText, | |
| data, | |
| }: { | |
| status: number; | |
| statusText?: string; | |
| data?: unknown; | |
| }) { | |
| super(ERROR_DIGEST_BASE); | |
| this.digest = `${ERROR_DIGEST_BASE}${JSON.stringify([ERROR_BOUNDARY_ERROR, status, statusText, data])}`; | |
| } | |
| } | |
| export function callServer({ | |
| AtpBaseClient, | |
| authNamespace, | |
| oauthCallbackPathname, | |
| oauthClientMeatadataPathname, | |
| request, | |
| routes, | |
| sessionSecrets, | |
| }: { | |
| AtpBaseClient: new ( | |
| options: FetchHandler | FetchHandlerOptions, | |
| ) => ProtoflareServer.XrpcClient; | |
| authNamespace: KVNamespace<string>; | |
| oauthCallbackPathname: string; | |
| oauthClientMeatadataPathname: string; | |
| request: Request; | |
| routes: RSCRouteConfig; | |
| sessionSecrets: string[]; | |
| }): Promise<Response> { | |
| return provideRequestContext(request, () => | |
| provideSessionContext({ request, secrets: sessionSecrets }, () => | |
| provideAtProtoContext( | |
| { | |
| AtpBaseClient, | |
| namespace: authNamespace, | |
| oauthCallbackPathname, | |
| oauthClientMeatadataPathname, | |
| request, | |
| }, | |
| () => { | |
| return matchRSCServerRequest({ | |
| createTemporaryReferenceSet, | |
| decodeAction, | |
| decodeFormState, | |
| decodeReply, | |
| loadServerAction, | |
| request, | |
| routes, | |
| generateResponse(match, { temporaryReferences }) { | |
| const headers = new Headers(match.headers); | |
| headers.set("Content-Type", "text/x-component; charset=utf-8"); | |
| return new Response( | |
| renderToReadableStream(match.payload, { | |
| temporaryReferences, | |
| onError(error: unknown) { | |
| if ( | |
| error && | |
| typeof error === "object" && | |
| "digest" in error && | |
| typeof error.digest === "string" && | |
| error.digest.startsWith(`${ERROR_DIGEST_BASE}[`) && | |
| error.digest.endsWith("]") | |
| ) { | |
| return error.digest; | |
| } | |
| }, | |
| }), | |
| { | |
| status: match.statusCode, | |
| headers, | |
| }, | |
| ); | |
| }, | |
| }); | |
| }, | |
| ), | |
| ), | |
| ); | |
| } | |
| export async function prerender(request: Request, serverResponse: Response) { | |
| const ssr = await import.meta.viteRsc.loadModule< | |
| typeof import("./entry.ssr") | |
| >("ssr", "index"); | |
| const ssrResponse = await ssr.prerender( | |
| serializeRequestWithoutBody(request), | |
| serializeResponse(serverResponse), | |
| ); | |
| return deserializeResponse(ssrResponse); | |
| } | |
| export async function handleRequest({ | |
| AtpBaseClient, | |
| authNamespace, | |
| oauthCallbackPathname, | |
| oauthClientMeatadataPathname, | |
| request, | |
| routes, | |
| sessionSecrets, | |
| }: { | |
| AtpBaseClient: new ( | |
| options: FetchHandler | FetchHandlerOptions, | |
| ) => ProtoflareServer.XrpcClient; | |
| authNamespace: KVNamespace<string>; | |
| oauthCallbackPathname: string; | |
| oauthClientMeatadataPathname: string; | |
| request: Request; | |
| routes: RSCRouteConfig; | |
| sessionSecrets: string[]; | |
| }) { | |
| try { | |
| let serverResponse: Response; | |
| try { | |
| serverResponse = await callServer({ | |
| AtpBaseClient, | |
| authNamespace, | |
| oauthCallbackPathname, | |
| oauthClientMeatadataPathname, | |
| request, | |
| routes, | |
| sessionSecrets, | |
| }); | |
| } catch (error) { | |
| console.error("Error during RSC handling", error); | |
| throw error; | |
| } | |
| try { | |
| return await prerender(request, serverResponse); | |
| } catch (error) { | |
| console.error("Error during SSR prerender", error); | |
| throw error; | |
| } | |
| } catch (error) { | |
| return new Response("Internal Server Error", { status: 500 }); | |
| } | |
| } | |
| export type JetStreamMessage<T extends string = string> = { | |
| did: string; | |
| time_us: number; | |
| type: string; | |
| kind: string; | |
| commit?: { | |
| rev: string; | |
| type: string; | |
| operation: string; | |
| collection: T; | |
| rkey: string; | |
| record: { | |
| $type: T; | |
| }; | |
| cid: string; | |
| }; | |
| }; | |
| type LexiconsModule = { | |
| ids: Record<string, string>; | |
| }; | |
| type LexiconIds<Lexicon extends LexiconsModule> = | |
| Lexicon["ids"][keyof Lexicon["ids"]]; | |
| export class FirehoseDurableObject< | |
| Lexicon extends LexiconsModule, | |
| > extends DurableObject { | |
| #lastEventTime: number; | |
| #websocket: WebSocket | null = null; | |
| #wantedCollections: Set<LexiconIds<Lexicon>>; | |
| #lexicons: Lexicons; | |
| #handleMessage: ( | |
| message: JetStreamMessage<LexiconIds<Lexicon>>, | |
| ) => void | Promise<void>; | |
| constructor( | |
| ctx: DurableObjectState, | |
| env: Cloudflare.Env, | |
| lexicons: Lexicons, | |
| wantedCollections: LexiconIds<Lexicon>[], | |
| handleMessage: ( | |
| message: JetStreamMessage<LexiconIds<Lexicon>>, | |
| ) => void | Promise<void>, | |
| ) { | |
| super(ctx, env); | |
| this.#lexicons = lexicons; | |
| this.#wantedCollections = new Set(wantedCollections); | |
| this.#handleMessage = handleMessage; | |
| this.#lastEventTime = 0; | |
| ctx.blockConcurrencyWhile(async () => { | |
| this.#lastEventTime = (await ctx.storage.get("lastEventTime")) ?? 0; | |
| }); | |
| const onLoopError = (x: any) => { | |
| console.error("Loop failed with error: ", x); | |
| }; | |
| this.#startLoop().catch(onLoopError); | |
| } | |
| getLastEventTime(): number { | |
| return this.#lastEventTime; | |
| } | |
| async alarm() { | |
| await this.#resetAlarm(); | |
| } | |
| async #startLoop() { | |
| this.#resetAlarm(); | |
| await this.ctx.blockConcurrencyWhile(async () => { | |
| return new Promise<void>((resolve, reject) => { | |
| let url = new URL("wss://jetstream1.us-west.bsky.network/subscribe"); | |
| url.searchParams.set("cursor", String(this.#lastEventTime)); | |
| for (const collection of this.#wantedCollections) { | |
| url.searchParams.append("wantedCollections", collection); | |
| } | |
| console.info("Connecting to ", url.href); | |
| this.#websocket = new WebSocket(url); | |
| this.#websocket.addEventListener("open", () => { | |
| console.info("Connected to Jetstream."); | |
| resolve(); | |
| }); | |
| this.#websocket.addEventListener("error", (err) => { | |
| reject(err); | |
| console.error("Got error from WebSocket: ", err); | |
| this.#resetAlarm(); | |
| }); | |
| this.#websocket.addEventListener("close", (event) => { | |
| this.ctx.abort(`Reset due to disconnect: ${event.reason}`); | |
| }); | |
| this.#websocket.addEventListener("message", (event) => { | |
| const message: JetStreamMessage<LexiconIds<Lexicon>> = JSON.parse( | |
| event.data as string, | |
| ); | |
| this.#lastEventTime = message.time_us; | |
| if (message.kind === "commit" && message.commit) { | |
| const uri = `lex:${message.commit.collection}`; | |
| const lexicon = this.#lexicons.get(uri); | |
| const valid = lexicon | |
| ? message.commit.operation === "create" | |
| ? this.#lexicons.validate(uri, message.commit.record) | |
| : { success: true } | |
| : { success: false }; | |
| if (valid.success) { | |
| (async () => this.#handleMessage(message))() | |
| .then(() => { | |
| if (message.time_us > this.#lastEventTime) { | |
| this.ctx.waitUntil( | |
| this.ctx.storage.put("lastEventTime", message.time_us), | |
| ); | |
| } | |
| }) | |
| .catch(console.error); | |
| } | |
| } | |
| }); | |
| }); | |
| }); | |
| } | |
| async #resetAlarm() { | |
| const alarm = await this.ctx.storage.getAlarm(); | |
| // If we have an alarm set that is not in the past then | |
| // don't set another one. | |
| if (alarm && alarm - Date.now() > 0) { | |
| return; | |
| } | |
| // Set an alarm 5 seconds in the future to ensure the DO stays alive. | |
| await this.ctx.storage.setAlarm(Date.now() + 5000); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { AtUri } from "@atproto/syntax"; | |
| import { DurableObject } from "cloudflare:workers"; | |
| type RecordList = { | |
| cursor?: string; | |
| records: { | |
| uri: string; | |
| cid: string; | |
| value: { $type: string }; | |
| }[]; | |
| }; | |
| export class PDS extends DurableObject { | |
| constructor(state: DurableObjectState, env: Env) { | |
| super(state, env); | |
| } | |
| async createRecord( | |
| { cid, uri }: { cid: string; uri: string }, | |
| record: unknown, | |
| ): Promise<{ success: boolean }> { | |
| const { | |
| env: { REPO }, | |
| } = this; | |
| try { | |
| const { host } = AtUri.make(uri); | |
| using res = await REPO.getByName(host).createRecord({ cid, uri }, record); | |
| return { success: res.success }; | |
| } catch (error) { | |
| console.error("Failed to call REPO.createRecord", error); | |
| return { success: false }; | |
| } | |
| } | |
| async deleteRecord({ | |
| collection, | |
| repo, | |
| rkey, | |
| }: { | |
| collection: string; | |
| repo: string; | |
| rkey: string; | |
| }) { | |
| const { | |
| env: { REPO }, | |
| } = this; | |
| try { | |
| using res = await REPO.getByName(repo).deleteRecord({ collection, rkey }); | |
| return { success: res.success }; | |
| } catch (error) { | |
| console.error("Failed to call REPO.deleteRecord", error); | |
| return { success: false }; | |
| } | |
| } | |
| async listRecords({ | |
| collection, | |
| cursor, | |
| limit, | |
| repo, | |
| reverse, | |
| }: { | |
| collection: string; | |
| cursor?: string; | |
| limit?: number; | |
| repo: string; | |
| reverse?: boolean; | |
| }): Promise<RecordList> { | |
| const repoStub = this.env.REPO.getByName(repo); | |
| using results = await repoStub.listRecords({ | |
| collection, | |
| cursor, | |
| limit, | |
| repo, | |
| reverse, | |
| }); | |
| return { | |
| records: results.records, | |
| cursor: results.cursor, | |
| }; | |
| } | |
| } | |
| export class RepoStorage extends DurableObject { | |
| constructor(state: DurableObjectState, env: Env) { | |
| super(state, env); | |
| const { | |
| storage: { sql }, | |
| } = this.ctx; | |
| sql.exec(/* SQL */ ` | |
| CREATE TABLE IF NOT EXISTS records ( | |
| cid TEXT NOT NULL, | |
| collection TEXT NOT NULL, | |
| rkey TEXT NOT NULL, | |
| record TEXT NOT NULL, | |
| createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_records_cid_collection_rkey ON records (cid, collection, rkey); | |
| `); | |
| } | |
| async createRecord( | |
| { cid, uri }: { cid: string; uri: string }, | |
| record: unknown, | |
| ): Promise<{ success: boolean }> { | |
| const { | |
| storage: { sql }, | |
| } = this.ctx; | |
| try { | |
| const { collection, rkey } = AtUri.make(uri); | |
| const inserted = sql.exec( | |
| /* SQL */ ` | |
| INSERT INTO records (cid, collection, rkey, record) | |
| VALUES (?, ?, ?, ?); | |
| `, | |
| cid, | |
| collection, | |
| rkey, | |
| JSON.stringify(record), | |
| ); | |
| if (!inserted.rowsWritten) { | |
| throw new Error("records row not written to database"); | |
| } | |
| return { success: true }; | |
| } catch (error) { | |
| console.error("Failed to insert record", error); | |
| return { success: false }; | |
| } | |
| } | |
| async deleteRecord({ | |
| collection, | |
| rkey, | |
| }: { | |
| collection: string; | |
| rkey: string; | |
| }): Promise<{ success: boolean }> { | |
| const { | |
| storage: { sql }, | |
| } = this.ctx; | |
| try { | |
| sql.exec( | |
| /* SQL */ ` | |
| DELETE FROM records | |
| WHERE \`collection\` = ? AND rkey = ?; | |
| `, | |
| collection, | |
| rkey, | |
| ); | |
| return { success: true }; | |
| } catch (error) { | |
| console.error("Failed to delete record", error); | |
| return { success: false }; | |
| } | |
| } | |
| async listRecords({ | |
| collection, | |
| cursor, | |
| limit, | |
| repo, | |
| reverse, | |
| }: { | |
| collection: string; | |
| cursor?: string; | |
| limit?: number; | |
| repo: string; | |
| reverse?: boolean; | |
| }): Promise<RecordList> { | |
| const { | |
| storage: { sql }, | |
| } = this.ctx; | |
| limit = Math.max(0, Math.min(limit ?? 100, 100)); | |
| const order = reverse ? "ASC" : "DESC"; | |
| const params: (string | number)[] = [collection]; | |
| if (cursor) { | |
| params.push(cursor); | |
| } | |
| params.push(limit); | |
| const query = /* SQL */ ` | |
| SELECT cid, rkey, record | |
| FROM records | |
| WHERE collection = ? | |
| ${cursor ? `AND rkey ${reverse ? ">" : "<"} ?` : ""} | |
| ORDER BY rkey ${order}, cid ${order} | |
| LIMIT ?; | |
| `; | |
| const results = sql.exec<{ | |
| cid: string; | |
| rkey: string; | |
| record: string; | |
| }>(query, ...params); | |
| const records = results.toArray().map((row) => ({ | |
| uri: AtUri.make(repo, collection, row.rkey).href, | |
| cid: row.cid as string, | |
| value: { | |
| $type: collection, | |
| ...JSON.parse(row.record as string), | |
| }, | |
| })); | |
| let newCursor: string | undefined; | |
| if (records.length === limit) { | |
| const uri = AtUri.make(records[records.length - 1].uri, collection); | |
| newCursor = uri.rkey; | |
| } | |
| return { | |
| records, | |
| cursor: newCursor, | |
| }; | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment