Skip to content

Instantly share code, notes, and snippets.

@jacob-ebey
Last active October 3, 2025 03:56
Show Gist options
  • Save jacob-ebey/573756684a0d09e3816e587e51a32b92 to your computer and use it in GitHub Desktop.
Save jacob-ebey/573756684a0d09e3816e587e51a32b92 to your computer and use it in GitHub Desktop.
Cloudflare ATProto DO PDS
import { DurableObject } from "cloudflare:workers";
import type { Lexicons } from "@atproto/lexicon";
import {
XrpcClient as BaseXrpcClient,
FetchHandler,
FetchHandlerOptions,
} from "@atproto/xrpc";
import {
createTemporaryReferenceSet,
decodeAction,
decodeFormState,
decodeReply,
loadServerAction,
renderToReadableStream,
} from "@vitejs/plugin-rsc/rsc";
import {
unstable_matchRSCServerRequest as matchRSCServerRequest,
type unstable_RSCRouteConfig as RSCRouteConfig,
} from "react-router";
import { provideAtProtoContext } from "./atproto";
import { provideRequestContext } from "./request";
import { provideSessionContext } from "./session";
import { ERROR_BOUNDARY_ERROR, ERROR_DIGEST_BASE } from "./shared";
import {
deserializeResponse,
serializeRequestWithoutBody,
serializeResponse,
} from "./transport-tools";
export { getAtprotoClient } from "./atproto";
export {
AtprotoOAuthClient,
resolveDidDocument,
resolveDidFromHandle,
} from "./atproto-oauth-client";
export { getRequest } from "./request";
export { destroySession, getSession } from "./session";
declare global {
namespace ProtoflareServer {
export interface XrpcClient extends BaseXrpcClient {}
}
}
export class BoundaryError extends Error {
public digest: `${typeof ERROR_DIGEST_BASE}${string}`;
constructor({
status,
statusText,
data,
}: {
status: number;
statusText?: string;
data?: unknown;
}) {
super(ERROR_DIGEST_BASE);
this.digest = `${ERROR_DIGEST_BASE}${JSON.stringify([ERROR_BOUNDARY_ERROR, status, statusText, data])}`;
}
}
export function callServer({
AtpBaseClient,
authNamespace,
oauthCallbackPathname,
oauthClientMeatadataPathname,
request,
routes,
sessionSecrets,
}: {
AtpBaseClient: new (
options: FetchHandler | FetchHandlerOptions,
) => ProtoflareServer.XrpcClient;
authNamespace: KVNamespace<string>;
oauthCallbackPathname: string;
oauthClientMeatadataPathname: string;
request: Request;
routes: RSCRouteConfig;
sessionSecrets: string[];
}): Promise<Response> {
return provideRequestContext(request, () =>
provideSessionContext({ request, secrets: sessionSecrets }, () =>
provideAtProtoContext(
{
AtpBaseClient,
namespace: authNamespace,
oauthCallbackPathname,
oauthClientMeatadataPathname,
request,
},
() => {
return matchRSCServerRequest({
createTemporaryReferenceSet,
decodeAction,
decodeFormState,
decodeReply,
loadServerAction,
request,
routes,
generateResponse(match, { temporaryReferences }) {
const headers = new Headers(match.headers);
headers.set("Content-Type", "text/x-component; charset=utf-8");
return new Response(
renderToReadableStream(match.payload, {
temporaryReferences,
onError(error: unknown) {
if (
error &&
typeof error === "object" &&
"digest" in error &&
typeof error.digest === "string" &&
error.digest.startsWith(`${ERROR_DIGEST_BASE}[`) &&
error.digest.endsWith("]")
) {
return error.digest;
}
},
}),
{
status: match.statusCode,
headers,
},
);
},
});
},
),
),
);
}
export async function prerender(request: Request, serverResponse: Response) {
const ssr = await import.meta.viteRsc.loadModule<
typeof import("./entry.ssr")
>("ssr", "index");
const ssrResponse = await ssr.prerender(
serializeRequestWithoutBody(request),
serializeResponse(serverResponse),
);
return deserializeResponse(ssrResponse);
}
export async function handleRequest({
AtpBaseClient,
authNamespace,
oauthCallbackPathname,
oauthClientMeatadataPathname,
request,
routes,
sessionSecrets,
}: {
AtpBaseClient: new (
options: FetchHandler | FetchHandlerOptions,
) => ProtoflareServer.XrpcClient;
authNamespace: KVNamespace<string>;
oauthCallbackPathname: string;
oauthClientMeatadataPathname: string;
request: Request;
routes: RSCRouteConfig;
sessionSecrets: string[];
}) {
try {
let serverResponse: Response;
try {
serverResponse = await callServer({
AtpBaseClient,
authNamespace,
oauthCallbackPathname,
oauthClientMeatadataPathname,
request,
routes,
sessionSecrets,
});
} catch (error) {
console.error("Error during RSC handling", error);
throw error;
}
try {
return await prerender(request, serverResponse);
} catch (error) {
console.error("Error during SSR prerender", error);
throw error;
}
} catch (error) {
return new Response("Internal Server Error", { status: 500 });
}
}
export type JetStreamMessage<T extends string = string> = {
did: string;
time_us: number;
type: string;
kind: string;
commit?: {
rev: string;
type: string;
operation: string;
collection: T;
rkey: string;
record: {
$type: T;
};
cid: string;
};
};
type LexiconsModule = {
ids: Record<string, string>;
};
type LexiconIds<Lexicon extends LexiconsModule> =
Lexicon["ids"][keyof Lexicon["ids"]];
export class FirehoseDurableObject<
Lexicon extends LexiconsModule,
> extends DurableObject {
#lastEventTime: number;
#websocket: WebSocket | null = null;
#wantedCollections: Set<LexiconIds<Lexicon>>;
#lexicons: Lexicons;
#handleMessage: (
message: JetStreamMessage<LexiconIds<Lexicon>>,
) => void | Promise<void>;
constructor(
ctx: DurableObjectState,
env: Cloudflare.Env,
lexicons: Lexicons,
wantedCollections: LexiconIds<Lexicon>[],
handleMessage: (
message: JetStreamMessage<LexiconIds<Lexicon>>,
) => void | Promise<void>,
) {
super(ctx, env);
this.#lexicons = lexicons;
this.#wantedCollections = new Set(wantedCollections);
this.#handleMessage = handleMessage;
this.#lastEventTime = 0;
ctx.blockConcurrencyWhile(async () => {
this.#lastEventTime = (await ctx.storage.get("lastEventTime")) ?? 0;
});
const onLoopError = (x: any) => {
console.error("Loop failed with error: ", x);
};
this.#startLoop().catch(onLoopError);
}
getLastEventTime(): number {
return this.#lastEventTime;
}
async alarm() {
await this.#resetAlarm();
}
async #startLoop() {
this.#resetAlarm();
await this.ctx.blockConcurrencyWhile(async () => {
return new Promise<void>((resolve, reject) => {
let url = new URL("wss://jetstream1.us-west.bsky.network/subscribe");
url.searchParams.set("cursor", String(this.#lastEventTime));
for (const collection of this.#wantedCollections) {
url.searchParams.append("wantedCollections", collection);
}
console.info("Connecting to ", url.href);
this.#websocket = new WebSocket(url);
this.#websocket.addEventListener("open", () => {
console.info("Connected to Jetstream.");
resolve();
});
this.#websocket.addEventListener("error", (err) => {
reject(err);
console.error("Got error from WebSocket: ", err);
this.#resetAlarm();
});
this.#websocket.addEventListener("close", (event) => {
this.ctx.abort(`Reset due to disconnect: ${event.reason}`);
});
this.#websocket.addEventListener("message", (event) => {
const message: JetStreamMessage<LexiconIds<Lexicon>> = JSON.parse(
event.data as string,
);
this.#lastEventTime = message.time_us;
if (message.kind === "commit" && message.commit) {
const uri = `lex:${message.commit.collection}`;
const lexicon = this.#lexicons.get(uri);
const valid = lexicon
? message.commit.operation === "create"
? this.#lexicons.validate(uri, message.commit.record)
: { success: true }
: { success: false };
if (valid.success) {
(async () => this.#handleMessage(message))()
.then(() => {
if (message.time_us > this.#lastEventTime) {
this.ctx.waitUntil(
this.ctx.storage.put("lastEventTime", message.time_us),
);
}
})
.catch(console.error);
}
}
});
});
});
}
async #resetAlarm() {
const alarm = await this.ctx.storage.getAlarm();
// If we have an alarm set that is not in the past then
// don't set another one.
if (alarm && alarm - Date.now() > 0) {
return;
}
// Set an alarm 5 seconds in the future to ensure the DO stays alive.
await this.ctx.storage.setAlarm(Date.now() + 5000);
}
}
import { AtUri } from "@atproto/syntax";
import { DurableObject } from "cloudflare:workers";
type RecordList = {
cursor?: string;
records: {
uri: string;
cid: string;
value: { $type: string };
}[];
};
export class PDS extends DurableObject {
constructor(state: DurableObjectState, env: Env) {
super(state, env);
}
async createRecord(
{ cid, uri }: { cid: string; uri: string },
record: unknown,
): Promise<{ success: boolean }> {
const {
env: { REPO },
} = this;
try {
const { host } = AtUri.make(uri);
using res = await REPO.getByName(host).createRecord({ cid, uri }, record);
return { success: res.success };
} catch (error) {
console.error("Failed to call REPO.createRecord", error);
return { success: false };
}
}
async deleteRecord({
collection,
repo,
rkey,
}: {
collection: string;
repo: string;
rkey: string;
}) {
const {
env: { REPO },
} = this;
try {
using res = await REPO.getByName(repo).deleteRecord({ collection, rkey });
return { success: res.success };
} catch (error) {
console.error("Failed to call REPO.deleteRecord", error);
return { success: false };
}
}
async listRecords({
collection,
cursor,
limit,
repo,
reverse,
}: {
collection: string;
cursor?: string;
limit?: number;
repo: string;
reverse?: boolean;
}): Promise<RecordList> {
const repoStub = this.env.REPO.getByName(repo);
using results = await repoStub.listRecords({
collection,
cursor,
limit,
repo,
reverse,
});
return {
records: results.records,
cursor: results.cursor,
};
}
}
export class RepoStorage extends DurableObject {
constructor(state: DurableObjectState, env: Env) {
super(state, env);
const {
storage: { sql },
} = this.ctx;
sql.exec(/* SQL */ `
CREATE TABLE IF NOT EXISTS records (
cid TEXT NOT NULL,
collection TEXT NOT NULL,
rkey TEXT NOT NULL,
record TEXT NOT NULL,
createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_records_cid_collection_rkey ON records (cid, collection, rkey);
`);
}
async createRecord(
{ cid, uri }: { cid: string; uri: string },
record: unknown,
): Promise<{ success: boolean }> {
const {
storage: { sql },
} = this.ctx;
try {
const { collection, rkey } = AtUri.make(uri);
const inserted = sql.exec(
/* SQL */ `
INSERT INTO records (cid, collection, rkey, record)
VALUES (?, ?, ?, ?);
`,
cid,
collection,
rkey,
JSON.stringify(record),
);
if (!inserted.rowsWritten) {
throw new Error("records row not written to database");
}
return { success: true };
} catch (error) {
console.error("Failed to insert record", error);
return { success: false };
}
}
async deleteRecord({
collection,
rkey,
}: {
collection: string;
rkey: string;
}): Promise<{ success: boolean }> {
const {
storage: { sql },
} = this.ctx;
try {
sql.exec(
/* SQL */ `
DELETE FROM records
WHERE \`collection\` = ? AND rkey = ?;
`,
collection,
rkey,
);
return { success: true };
} catch (error) {
console.error("Failed to delete record", error);
return { success: false };
}
}
async listRecords({
collection,
cursor,
limit,
repo,
reverse,
}: {
collection: string;
cursor?: string;
limit?: number;
repo: string;
reverse?: boolean;
}): Promise<RecordList> {
const {
storage: { sql },
} = this.ctx;
limit = Math.max(0, Math.min(limit ?? 100, 100));
const order = reverse ? "ASC" : "DESC";
const params: (string | number)[] = [collection];
if (cursor) {
params.push(cursor);
}
params.push(limit);
const query = /* SQL */ `
SELECT cid, rkey, record
FROM records
WHERE collection = ?
${cursor ? `AND rkey ${reverse ? ">" : "<"} ?` : ""}
ORDER BY rkey ${order}, cid ${order}
LIMIT ?;
`;
const results = sql.exec<{
cid: string;
rkey: string;
record: string;
}>(query, ...params);
const records = results.toArray().map((row) => ({
uri: AtUri.make(repo, collection, row.rkey).href,
cid: row.cid as string,
value: {
$type: collection,
...JSON.parse(row.record as string),
},
}));
let newCursor: string | undefined;
if (records.length === limit) {
const uri = AtUri.make(records[records.length - 1].uri, collection);
newCursor = uri.rkey;
}
return {
records,
cursor: newCursor,
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment