Skip to content

Instantly share code, notes, and snippets.

@marcocitus
Last active February 24, 2024 13:17
Show Gist options
  • Save marcocitus/1ac72e7533dbb01801973ee51f89fecf to your computer and use it in GitHub Desktop.
Save marcocitus/1ac72e7533dbb01801973ee51f89fecf to your computer and use it in GitHub Desktop.

Revisions

  1. marcocitus revised this gist Mar 18, 2019. 1 changed file with 3 additions and 3 deletions.
    6 changes: 3 additions & 3 deletions rollups.sql
    Original file line number Diff line number Diff line change
    @@ -40,9 +40,9 @@ BEGIN
    * such that writes can resume.
    */
    BEGIN
    EXECUTE format('LOCK %s IN EXCLUSIVE MODE', table_to_lock);
    RAISE 'release table lock';
    EXCEPTION WHEN OTHERS THEN
    EXECUTE format('LOCK %s IN SHARE ROW EXCLUSIVE MODE', table_to_lock);
    RAISE 'release table lock' USING ERRCODE = 'RLTBL';
    EXCEPTION WHEN SQLSTATE 'RLTBL' THEN
    END;

    /*
  2. marcocitus revised this gist Jun 14, 2018. 1 changed file with 4 additions and 0 deletions.
    4 changes: 4 additions & 0 deletions rollups.sql
    Original file line number Diff line number Diff line change
    @@ -22,6 +22,10 @@ BEGIN
    FROM rollups
    WHERE name = rollup_name FOR UPDATE;

    IF NOT FOUND THEN
    RAISE 'rollup ''%'' is not in the rollups table', rollup_name;
    END IF;

    IF window_end IS NULL THEN
    /* sequence was never used */
    window_end := 0;
  3. marcocitus created this gist Jun 7, 2018.
    57 changes: 57 additions & 0 deletions example.sql
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,57 @@
    -- Create the raw events table
    CREATE TABLE page_views (
    site_id int,
    path text,
    client_ip inet,
    view_time timestamptz default now(),
    view_id bigserial
    );

    -- Allow fast lookups of ranges of sequence IDs
    CREATE INDEX view_id_idx ON page_views USING BRIN (view_id);

    -- Citus only: distribute the table by site ID
    SELECT create_distributed_table('page_views', 'site_id');

    -- Create the rollup table
    CREATE TABLE page_views_1min (
    site_id int,
    path text,
    period_start timestamptz,
    view_count bigint,
    primary key (site_id, path, period_start)
    );

    -- Citus only: distribute the table by site ID
    SELECT create_distributed_table('page_views_1min', 'site_id');

    -- Add our 1-minute rollup to the rollups table
    INSERT INTO rollups (name, event_table_name, event_id_sequence_name)
    VALUES ('page_views_1min_rollup', 'page_views', 'page_views_view_id_seq');

    -- Define the aggregation
    CREATE OR REPLACE FUNCTION do_page_view_aggregation(OUT start_id bigint, OUT end_id bigint)
    RETURNS record
    LANGUAGE plpgsql
    AS $function$
    BEGIN
    /* determine which page views we can safely aggregate */
    SELECT window_start, window_end INTO start_id, end_id
    FROM incremental_rollup_window('page_views_1min_rollup');

    /* exit early if there are no new page views to aggregate */
    IF start_id > end_id THEN RETURN; END IF;

    /* aggregate the page views */
    INSERT INTO page_views_1min (site_id, path, period_start, view_count)
    SELECT site_id, path, date_trunc('minute', view_time), count(*) AS view_count
    FROM page_views
    WHERE view_id BETWEEN start_id AND end_id
    GROUP BY site_id, path, date_trunc('minute', view_time)
    ON CONFLICT (site_id, path, period_start) DO UPDATE
    SET view_count = page_views_1min.view_count + EXCLUDED.view_count;
    END;
    $function$;

    -- Run the aggregation
    SELECT * FROM do_page_view_aggregation();
    49 changes: 49 additions & 0 deletions rollups.sql
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,49 @@
    CREATE TABLE rollups (
    name text primary key,
    event_table_name text not null,
    event_id_sequence_name text not null,
    last_aggregated_id bigint default 0
    );

    CREATE OR REPLACE FUNCTION incremental_rollup_window(rollup_name text, OUT window_start bigint, OUT window_end bigint)
    RETURNS record
    LANGUAGE plpgsql
    AS $function$
    DECLARE
    table_to_lock regclass;
    BEGIN
    /*
    * Perform aggregation from the last aggregated ID + 1 up to the last committed ID.
    * We do a SELECT .. FOR UPDATE on the row in the rollup table to prevent
    * aggregations from running concurrently.
    */
    SELECT event_table_name, last_aggregated_id+1, pg_sequence_last_value(event_id_sequence_name)
    INTO table_to_lock, window_start, window_end
    FROM rollups
    WHERE name = rollup_name FOR UPDATE;

    IF window_end IS NULL THEN
    /* sequence was never used */
    window_end := 0;
    RETURN;
    END IF;

    /*
    * Play a little trick: We very briefly lock the table for writes in order to
    * wait for all pending writes to finish. That way, we are sure that there are
    * no more uncommitted writes with a identifier lower or equal to window_end.
    * By throwing an exception, we release the lock immediately after obtaining it
    * such that writes can resume.
    */
    BEGIN
    EXECUTE format('LOCK %s IN EXCLUSIVE MODE', table_to_lock);
    RAISE 'release table lock';
    EXCEPTION WHEN OTHERS THEN
    END;

    /*
    * Remember the end of the window to continue from there next time.
    */
    UPDATE rollups SET last_aggregated_id = window_end WHERE name = rollup_name;
    END;
    $function$;