Created
February 29, 2024 06:54
-
-
Save neharkarvishal/58983959b20c3e12189ce8ea159407cd to your computer and use it in GitHub Desktop.
Revisions
-
neharkarvishal created this gist
Feb 29, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,141 @@ Timeseries Data in MySQL * IOT Readings * Performance Metrics * Heartbeat System We operate on the tinesearies data, chunk it down into buckets and run mins, avgs and maxes over all of that data * table ```sql CREATE TABLE timeseries ( `id` bigint UNSIGNED NOT NULL AUTO_INCREMENT, `k` varchar(255) CHARACTER SET ascii COLLATE ascii_bin NOT NULL DEFAULT 'default', `v` varchar(255) CHARACTER SET ascii COLLATE ascii_bin NOT NULL DEFAULT '', `datapoint` int UNSIGNED NOT NULL DEFAULT 0, `dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ``` * all rows ```sql SELECT k, v, datapoint, UNIX_TIMESTAMP( dt ) AS ts, FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) AS bucket,-- 6 hour chart of 60 points on chart COUNT(*) over bucket AS count_over_bucket, AVG( v ) over bucket AS avg_v_over_bucket, MAX( v ) over bucket AS max_v_over_bucket, MIN( v ) over bucket AS min_v_over_bucket FROM `timeseries` WHERE k = 'default' window bucket AS ( PARTITION BY FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) ORDER BY dt ) ``` * filtering intermediate rows and keeping one row per bucket ```sql WITH windowed_data AS ( SELECT k, v, datapoint, UNIX_TIMESTAMP( dt ) AS last_timestamp, first_value( UNIX_TIMESTAMP( dt ) ) over bucket AS first_timestamp, FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) AS bucket,-- 6 hour chart of 60 points on chart COUNT(*) over bucket AS count_over_bucket, AVG( v ) over bucket AS avg_v_over_bucket, MAX( v ) over bucket AS max_v_over_bucket, MIN( v ) over bucket AS min_v_over_bucket, IF ( lead( k, 1 ) over bucket IS NULL, 1, 0 ) AS final_row_in_bucket -- one key forward within the window for current partition FROM `timeseries` WHERE k = 'default' window bucket AS ( PARTITION BY FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) ORDER BY dt ) ) SELECT * FROM windowed_data WHERE final_row_in_bucket = 1 ``` * for all of the keys, last 6 hours data to speed up things ```sql WITH windowed_data AS ( SELECT '6hr_60pt' as `type`, k, v, datapoint, UNIX_TIMESTAMP( dt ) AS last_timestamp, first_value( UNIX_TIMESTAMP( dt ) ) over bucket AS first_timestamp, FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) AS bucket,-- 6 hour chart of 60 points on chart COUNT(*) over bucket AS count_over_bucket, AVG( v ) over bucket AS avg_v_over_bucket, MAX( v ) over bucket AS max_v_over_bucket, MIN( v ) over bucket AS min_v_over_bucket, IF ( lead( k, 1 ) over bucket IS NULL, 1, 0 ) AS final_row_in_bucket -- one key forward within the window for current partition FROM `timeseries` WHERE dt >= NOW() - INTERVAL 6 HOUR -- data from last 6 hours window bucket AS ( PARTITION BY k, FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) ORDER BY k, dt ) ) SELECT * FROM windowed_data WHERE final_row_in_bucket = 1 ``` next to speed up, create summary for current computation, at run time calculate last remaining bucket insert into timeseries_rollup ( select * from windowed_date ) have compount index over type, key and bucket, on collition update count/avg/min/max, select directly from rollup table, we could drop INTERVAL from 6 hours to 15 minites and keep updating rollup table while calulating timeseries in backgorund ```js let booking_time_series = 'time_series_booking' exports.up = async knex => { await knex.schema.createTable(booking_time_series, table => { table.increments("id").unsigned().primary(); table.string('k', 255).collate('ascii_bin').notNullable().defaultTo('default') table.string('v', 255).collate('ascii_bin').notNullable().defaultTo('') table.integer("datapoint").notNullable().defaultTo(1) table.datetime('dt').notNullable().defaultTo(knex.fn.now()) }) await knex.schema.table(booking_time_series, table => { table.index(['dt', 'k', 'v', 'datapoint']) }) } exports.down = async knex => { await knex.schema.dropTable(booking_time_series) } ```