Skip to content

Instantly share code, notes, and snippets.

@MAST1999
Created June 15, 2025 15:54
Show Gist options
  • Save MAST1999/2fdbf0d71e31686c3edcef116435064c to your computer and use it in GitHub Desktop.
Save MAST1999/2fdbf0d71e31686c3edcef116435064c to your computer and use it in GitHub Desktop.

Revisions

  1. MAST1999 created this gist Jun 15, 2025.
    209 changes: 209 additions & 0 deletions drizzle-live-query.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,209 @@
    function processQueryResults<T>(
    query: T,
    rawRows: any[],
    ): Record<string, any>[] {
    return rawRows.map((row) => {
    return mapRelationalRow(
    (query as any).schema,
    (query as any).tableConfig,
    Object.values(row),
    (query as any)._getQuery().selection,
    );
    });
    }

    function createQueryResult<T extends PgRelationalQuery<unknown>>(
    mappedRows: Record<string, any>[],
    mode: "many" | "one",
    items?: { affectedRows?: number; fields?: any[]; blob?: any },
    ): { data: Awaited<T> } & Omit<LiveQueryResults<unknown>, "rows"> {
    return {
    data: (mode === "many"
    ? mappedRows
    : mappedRows[0] || undefined) as Awaited<T>,
    affectedRows: items?.affectedRows || 0,
    fields: items?.fields || [],
    blob: items?.blob,
    // @ts-expect-error this exists
    limit: items?.limit,
    // @ts-expect-error this exists
    offset: items?.offset,
    // @ts-expect-error this exists
    totalCount: items?.totalCount,
    };
    }

    function mapRelationalRow(
    tablesConfig: TablesRelationalConfig,
    tableConfig: TableRelationalConfig,
    row: unknown[],
    buildQueryResultSelection: BuildRelationalQueryResult["selection"],
    mapColumnValue: (value: unknown) => unknown = (value) => value,
    ): Record<string, unknown> {
    const result: Record<string, unknown> = {};

    for (const [
    selectionItemIndex,
    selectionItem,
    ] of buildQueryResultSelection.entries()) {
    if (selectionItem.isJson) {
    const relation = tableConfig.relations[selectionItem.tsKey]!;
    const rawSubRows = row[selectionItemIndex] as
    | unknown[]
    | null
    | [null]
    | string;
    const subRows =
    typeof rawSubRows === "string"
    ? (JSON.parse(rawSubRows) as unknown[])
    : rawSubRows;
    result[selectionItem.tsKey] = is(relation, One)
    ? subRows &&
    mapRelationalRow(
    tablesConfig,
    tablesConfig[selectionItem.relationTableTsKey!]!,
    subRows,
    selectionItem.selection,
    mapColumnValue,
    )
    : (subRows as unknown[][]).map((subRow) =>
    mapRelationalRow(
    tablesConfig,
    tablesConfig[selectionItem.relationTableTsKey!]!,
    subRow,
    selectionItem.selection,
    mapColumnValue,
    ),
    );
    } else {
    const value = mapColumnValue(row[selectionItemIndex]);
    const field = selectionItem.field!;
    let decoder: any;
    if (is(field, Column)) {
    decoder = field;
    } else if (is(field, SQL)) {
    // @ts-expect-error Internal field
    decoder = field.decoder;
    } else {
    // @ts-expect-error Internal field
    decoder = field.sql.decoder;
    }
    result[selectionItem.tsKey] =
    value === null ? null : decoder.mapFromDriverValue(value);
    }
    }

    return result;
    }
    type LivePagination = {
    offset: number;
    limit: number;
    };

    export interface DrizzleResult<T> extends Omit<Results, "rows"> {
    data: T;
    totalCount: number;
    offset: number;
    limit: number;
    }

    export function createDrizzleLive<
    T extends
    | {
    _: { result: any };
    toSQL: () => { params: Array<any>; sql: string };
    }
    | PgRelationalQuery<unknown>,
    TValue extends T extends PgRelationalQuery<unknown>
    ? Awaited<T>
    : T["_"]["result"],
    >(fn: (query: Db) => T, paginationSignal?: Accessor<LivePagination>) {
    const { pg, db } = useDB();
    const [shapreResult, setResult] = createSignal<
    | (DrizzleResult<TValue> & {
    totalCount: number;
    offset: number;
    limit: number;
    })
    | undefined
    >(undefined, { name: "LiveQueryResult" });

    const initialPagination = paginationSignal?.();
    const shape = createAsync(
    async () => {
    const query = fn(db);
    const isRelational = is(query, PgRelationalQuery);
    const { sql, params } = query.toSQL();

    performance.mark("live-query-start", {
    detail: { sql, params },
    });

    const s = await pg.live.query({
    query: sql,
    params,
    ...initialPagination,
    callback: updateResult,
    });

    performance.mark("live-query-end", { detail: { sql, params } });
    performance.measure("live-query", "live-query-start", "live-query-end");

    s.subscribe(updateResult);

    return s;

    function updateResult(items: LiveQueryResults<Record<string, any>>) {
    const res = isRelational
    ? createQueryResult(
    processQueryResults(query, items.rows),
    (query as any).mode,
    items,
    )
    : ({
    data: items.rows,
    fields: items.fields,
    affectedRows: items.affectedRows || 0,
    blob: items.blob,
    limit: items?.limit,
    offset: items?.offset,
    totalCount: items?.totalCount,
    } as DrizzleResult<TValue>);
    setResult(res as DrizzleResult<TValue>);
    }
    },
    {
    name: "LiveQueryAsyncPgLiveQuery",
    },
    );

    createEffect(
    () => {
    const pagination = paginationSignal?.();
    const paginationChanged = initialPagination !== pagination;
    if (!pagination || !paginationChanged) {
    return;
    }

    shape()?.refresh(pagination);
    },
    undefined,
    { name: "LiveQueryPaginationRefreshEffect" },
    );

    onCleanup(() => {
    shape()?.unsubscribe();
    });

    const result = createMemo(
    () => {
    return shapreResult();
    },
    undefined,
    {
    name: "LiveQueryResultMemo",
    },
    );

    return { result, shape };
    }