Skip to content

Instantly share code, notes, and snippets.

@rochi88
Forked from habibutsu/partitioning_functions.sql
Created December 4, 2024 02:00
Show Gist options
  • Save rochi88/1a882219a4be9f283148c62e89910ec4 to your computer and use it in GitHub Desktop.
Save rochi88/1a882219a4be9f283148c62e89910ec4 to your computer and use it in GitHub Desktop.
Postgresql / Sharding & Partitioning
create or replace
function partition_insert_trigger() returns trigger
language 'plpgsql' as $$
declare
column_name text := TG_ARGV[0];
columnt_format text := TG_ARGV[1];
column_value timestamp;
table_name text;
insert_stmt_tpl text := 'insert into %I select ($1).*';
insert_stmt text;
create_table_tpl text := '
create table {table_name} (
like {base_table_name} including all
);
alter table {table_name} inherit {base_table_name};
';
begin
execute format('select $1.%s', column_name) using NEW into column_value;
-- TODO: move
table_name := format('%s_%s', TG_TABLE_NAME, to_char(column_value, columnt_format));
insert_stmt = format(insert_stmt_tpl, table_name);
begin
execute format(insert_stmt_tpl, table_name) using new;
exception
when undefined_table then
begin
execute replace(replace(create_table_tpl,
'{table_name}', table_name),
'{base_table_name}', TG_TABLE_NAME);
exception
when duplicate_table then -- ignore
end;
execute insert_stmt using new;
end;
return NEW;
end;
$$;
create or replace
function init_partition(table_name text, column_name text, partition_interval text) returns void
language 'plpgsql' as $$
declare
columnt_format text;
query_tpl text := '
create trigger on_inserted_{table_name}
before insert on {table_name}
for each row
execute procedure partition_insert_trigger(''{column_name}'', ''{columnt_format}'');';
begin
CASE
WHEN partition_interval = 'monthly' THEN
columnt_format = '"y"yy_"m"mm';
WHEN partition_interval = 'weekly' THEN
columnt_format = '"y"yy_"m"mm_"w"w';
WHEN partition_interval = 'daily' THEN
columnt_format = '"y"yy_"m"mm_"d"dd';
WHEN partition_interval = 'hourly' THEN
columnt_format = '"y"yy_"m"mm_"d"dd_"h"hh24';
ELSE
RAISE EXCEPTION 'Must use a predefined time interval: monthly, weekly, daily, hourly.';
END CASE;
execute replace(replace(replace(query_tpl,
'{table_name}', table_name),
'{column_name}', column_name),
'{columnt_format}', columnt_format);
end;
$$;
CREATE TABLE chunks (
id serial,
relation_id oid not null,
chunk_id int not null,
PRIMARY KEY (id)
);
/*
Consistency Unique ID - CUID
Format:
- 41 bits for time in milliseconds (gives us 69 years)
- 13 bits that represent chunk ID (allow to create 8192 chunks)
- 10 bits that represent an auto-incrementing sequence, modulus 1024
(can be generated 1024 IDs, per chunk, per millisecond
*/
create type cuid as (created_at timestamp, chunk_id bigint, seq_id bigint);
create or replace
function next_id(chunk_id int, def_value text) returns bigint
language 'plpgsql' as $$
declare
result bigint;
our_epoch bigint := 1451595600000; -- 2016
seq_id bigint;
now_millis bigint;
begin
-- use def_value from base table (maybe will be better avoid this)
execute format('select %s', def_value) into seq_id;
select floor(extract(epoch from clock_timestamp()) * 1000) into now_millis;
result := (now_millis - our_epoch) << 23;
result := result | (chunk_id << 10);
result := result | (seq_id);
return result;
end;
$$;
create or replace
function unpack_cuid(id bigint) returns cuid
language 'plpgsql' as $$
declare
our_epoch bigint := 1451595600000; -- 2016
created_at timestamp;
chunk_id bigint;
seq_id bigint;
begin
select to_timestamp(((id >> 23) + our_epoch)* 0.001) into created_at;
chunk_id = (id & ((1 << 23) - 1) ) >> 10;
seq_id = id & ((1 << 10) - 1);
return row(created_at, chunk_id, seq_id)::cuid;
end;
$$;
create or replace
function create_chunk(table_name text, chunk_id int) returns int
language 'plpgsql' as $$
declare
def_value text;
query_tpl text :=
'create table {table_name}_{chunk_id} (
like {table_name} including all,
check( (unpack_cuid(id)).chunk_id = {chunk_id})
);
alter table {table_name}_{chunk_id} inherit {table_name};
alter table {table_name}_{chunk_id} alter column id
set default next_id({chunk_id}, ''{def_value}'');';
begin
if exists (
select * from chunks
where
chunks.relation_id = create_chunk.table_name::regclass and
chunks.chunk_id = create_chunk.chunk_id
) then
return null;
end if;
--- use default value from base table
select column_default into def_value
from information_schema.columns as is_columns
where is_columns.table_name = create_chunk.table_name and column_name = 'id';
insert into chunks (relation_id, chunk_id)
values(table_name::regclass, chunk_id);
execute replace(
replace(
replace(query_tpl,
'{table_name}', table_name),
'{chunk_id}', trim(to_char(chunk_id, '000'))),
'{def_value}', replace(def_value, '''', ''''''));
return chunk_id;
end;
$$;
create or replace
function create_chunks(
table_name text, from_number int, to_number int) returns int[]
language 'plpgsql' as $$
declare
result int[];
begin
select array(
select * from (
select create_chunk(table_name, num) as id
from generate_series(from_number, to_number) as num
) as r
where r.id is not null
) into result;
return result;
end;
$$;

At first, create a table for our experiments

create table example (
    id bigserial,
    data character varying(64) not null,
    created_at timestamp without time zone DEFAULT now(),
    primary key (id)
);

At second we should to initialize chunks that later if needed can be moved on other shards:

select create_chunks('example', 1, 4);
 create_chunks 
---------------
 {1,2,3,4}
(1 row)

And thirdly each chunk can be partitioned based on the column with timestamp:

select init_partition(
    'example_' || trim(to_char(chunk_id, '000')), 'created_at', 'hourly')
from chunks where relation_id = 'example'::regclass;

 init_partition 
----------------
 example_001
 example_002
 example_003
 example_004
(4 rows)

Finally, try to insert some data and look what will be in result

insert into example_001 (data, created_at) values('test data', timestamp '2016-01-12 17:00:00');
insert into example_002 (data, created_at) values('test data', timestamp '2016-02-12 18:00:00');
insert into example_001 (data, created_at) values('test data', timestamp '2016-01-12 18:00:00');
insert into example_002 (data, created_at) values('test data', timestamp '2016-02-12 17:00:00');

In result we obtain following tables in database:

test-server=> \d
 Schema |            Name             |   Type   |    Owner    
--------+-----------------------------+----------+-------------
 public | chunks                      | table    | test-server
 public | chunks_id_seq               | sequence | test-server
 public | example                     | table    | test-server
 public | example_001                 | table    | test-server
 public | example_001_y16_m01_d12_h17 | table    | test-server
 public | example_001_y16_m01_d12_h18 | table    | test-server
 public | example_002                 | table    | test-server
 public | example_002_y16_m02_d12_h17 | table    | test-server
 public | example_002_y16_m02_d12_h18 | table    | test-server
 public | example_003                 | table    | test-server
 public | example_004                 | table    | test-server
 public | example_id_seq              | sequence | test-server
(12 rows)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment