-- Create the raw events table CREATE TABLE page_views ( site_id int, path text, client_ip inet, view_time timestamptz default now(), view_id bigserial ); -- Allow fast lookups of ranges of sequence IDs CREATE INDEX view_id_idx ON page_views USING BRIN (view_id); -- Citus only: distribute the table by site ID SELECT create_distributed_table('page_views', 'site_id'); -- Create the rollup table CREATE TABLE page_views_1min ( site_id int, path text, period_start timestamptz, view_count bigint, primary key (site_id, path, period_start) ); -- Citus only: distribute the table by site ID SELECT create_distributed_table('page_views_1min', 'site_id'); -- Add our 1-minute rollup to the rollups table INSERT INTO rollups (name, event_table_name, event_id_sequence_name) VALUES ('page_views_1min_rollup', 'page_views', 'page_views_view_id_seq'); -- Define the aggregation CREATE OR REPLACE FUNCTION do_page_view_aggregation(OUT start_id bigint, OUT end_id bigint) RETURNS record LANGUAGE plpgsql AS $function$ BEGIN /* determine which page views we can safely aggregate */ SELECT window_start, window_end INTO start_id, end_id FROM incremental_rollup_window('page_views_1min_rollup'); /* exit early if there are no new page views to aggregate */ IF start_id > end_id THEN RETURN; END IF; /* aggregate the page views */ INSERT INTO page_views_1min (site_id, path, period_start, view_count) SELECT site_id, path, date_trunc('minute', view_time), count(*) AS view_count FROM page_views WHERE view_id BETWEEN start_id AND end_id GROUP BY site_id, path, date_trunc('minute', view_time) ON CONFLICT (site_id, path, period_start) DO UPDATE SET view_count = page_views_1min.view_count + EXCLUDED.view_count; END; $function$; -- Run the aggregation SELECT * FROM do_page_view_aggregation();