import EventEmitter from 'node:events'; /** * @typedef {import('node:net').Socket} Socket * @typedef {import('@electric-sql/pglite').PGlite} PGlite * @typedef {Socket['pause']} SocketPause * @typedef {Socket['resume']} SocketResume * @typedef {Socket['write']} SocketWrite * @typedef {Socket['end']} SocketEnd * @typedef {Socket['destroy']} SocketDestroy */ /** * Creates a socket instance for usage in postgres.js by directly wrapping the PGlite database. * * Example usage: * ```js * import { PGlite } from "@electric-sql/pglite"; * import postgres from "postgres"; * import { createPGliteSocket } from "./pglite-socket.mjs"; * * const db = new PGlite(); * const options = { * max: 1, * socket: () => createPGliteSocket(db), * }; * const sql = postgres(options); * try { * const description = await sql`select 1`.describe(); * // {"string":"select 1","types":[],"name":"kawf0mwrpjn2", * // "columns":[{"name":"?column?","table":0,"number":0,"type":23}]} * console.log(JSON.stringify(description)); * } finally { * sql.end(); * } * ``` * @param {PGlite} db * @returns {Socket} An object that implements everything of the Socket interface that postgres.js is needing to communicate with the PGlite database. */ export function createPGliteSocket(db) { /** @type {Socket['readyState']} */ let readyState = "open"; /** @type {Uint8Array[]} */ const readBuffer = []; /** @typedef {{promise: Promise, resolve: () => void}} CurrentPause */ /** @type {CurrentPause | null} */ let currentPause = null; /** @type {Promise | null} */ let currentExec = null; const eventEmitter = new EventEmitter(); /** * * @param {Uint8Array} data */ async function execute(/** @type {Uint8Array} */ data) { const exec = db.execProtocolRaw(data); currentExec = exec; const result = await exec; if (exec === currentExec) { currentExec = null; } return result; } function emitPendingData( /** @type {Uint8Array | undefined} */ result = undefined ) { if (currentPause || readBuffer.length > 0) { if (result) { readBuffer.push(result); } while (!currentPause) { const item = readBuffer.shift(); if (!item) break; eventEmitter.emit('data', Buffer.from(item)); } } else if (result) { eventEmitter.emit('data', Buffer.from(result)); } } async function emitAllPendingData() { while (readBuffer.length > 0) { while (currentPause) { await currentPause.promise; } emitPendingData(); } } async function process( /** @type {Uint8Array} */ data, /** @type {((err?: Error | null) => void) | undefined} */ cb) { /** @type {Uint8Array} */ let result; try { if (cb) { eventEmitter.once('drain', cb); } result = await execute(data); eventEmitter.emit('drain'); } catch (err) { eventEmitter.emit('drain', err instanceof Error ? err : new Error(`${err}`)); return; } emitPendingData(result); } /** @typedef {Uint8Array | string} SocketWriteData */ /** @typedef {(err?: Error | null) => void} SocketWriteCb */ /** @typedef {BufferEncoding} SocketWriteEncoding */ /** @type {SocketWrite} */ const write = function ( /** @type {SocketWriteData} */ data, /** @type {SocketWriteEncoding | SocketWriteCb | undefined} */ encoding = 'utf8', /** @type {SocketWriteCb | undefined} */ cb = undefined) { if (typeof encoding === "function") { cb = encoding; } if (typeof data === "string") { data = Buffer.from(data, typeof encoding === "string" ? encoding : "utf8"); } process(data, cb); return false; }; /** @type {SocketPause} */ const pause = function() { if (!currentPause) { /** @type {() => void} */ let resolve = () => { }; const promise = /** @type {Promise} */( new Promise((res) => resolve = res)); currentPause = { promise, resolve }; } return getInstance(); } /** @type {SocketResume} */ const resume = function() { if (currentPause) { const resolve = currentPause.resolve; currentPause = null; resolve(); } return getInstance(); } function close() { if (readyState === "open") { readyState = "closed"; eventEmitter.emit('close'); } } /** @typedef {Uint8Array | string} SocketEndData */ /** @typedef {BufferEncoding} SocketEndEncoding */ /** @typedef {() => void} SocketEndCallback */ /** @type {SocketEnd} */ const end = function ( /** @type {SocketEndData | SocketEndCallback | undefined} */ data, /** @type {SocketEndEncoding | SocketEndCallback | undefined} */ encoding = undefined, /** @type {SocketEndCallback | undefined} */ callback = undefined) { function onAllPendingDataEmitted() { if (callback) { callback(); } close(); } function waitForAllPendingDataEmitted() { emitAllPendingData().then(onAllPendingDataEmitted); } if (typeof data === "string") { if (typeof encoding === "string") { write(data, encoding, waitForAllPendingDataEmitted); } else if (typeof encoding === "function") { callback = encoding; write(data, waitForAllPendingDataEmitted); } } else if (typeof data === "function") { callback = data; waitForAllPendingDataEmitted(); } else if (data) { if (typeof encoding === "function") { callback = encoding; } write(data, waitForAllPendingDataEmitted); } return getInstance(); } /** @type {SocketDestroy} */ const destroy = function() { close(); return getInstance(); } function getInstance() { return /** @type {Socket} */(result); } const result = { on: eventEmitter.on.bind(eventEmitter), once: eventEmitter.once.bind(eventEmitter), removeListener: eventEmitter.removeListener.bind(eventEmitter), removeAllListeners: eventEmitter.removeAllListeners.bind(eventEmitter), write, pause, resume, end, destroy, }; Object.defineProperty(result, "readyState", { get: () => readyState, }); return getInstance(); }