Skip to content

Instantly share code, notes, and snippets.

@neharkarvishal
Created February 29, 2024 06:54
Show Gist options
  • Save neharkarvishal/58983959b20c3e12189ce8ea159407cd to your computer and use it in GitHub Desktop.
Save neharkarvishal/58983959b20c3e12189ce8ea159407cd to your computer and use it in GitHub Desktop.

Revisions

  1. neharkarvishal created this gist Feb 29, 2024.
    141 changes: 141 additions & 0 deletions Timeseries Data in MySQL.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,141 @@

    Timeseries Data in MySQL

    * IOT Readings
    * Performance Metrics
    * Heartbeat System

    We operate on the tinesearies data, chunk it down into
    buckets and run mins, avgs and maxes over all of that data

    * table
    ```sql
    CREATE TABLE timeseries (
    `id` bigint UNSIGNED NOT NULL AUTO_INCREMENT,
    `k` varchar(255) CHARACTER SET ascii COLLATE ascii_bin NOT NULL DEFAULT 'default',
    `v` varchar(255) CHARACTER SET ascii COLLATE ascii_bin NOT NULL DEFAULT '',
    `datapoint` int UNSIGNED NOT NULL DEFAULT 0,
    `dt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`)
    )
    ```

    * all rows

    ```sql
    SELECT
    k,
    v,
    datapoint,
    UNIX_TIMESTAMP( dt ) AS ts,
    FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) AS bucket,-- 6 hour chart of 60 points on chart
    COUNT(*) over bucket AS count_over_bucket,
    AVG( v ) over bucket AS avg_v_over_bucket,
    MAX( v ) over bucket AS max_v_over_bucket,
    MIN( v ) over bucket AS min_v_over_bucket
    FROM
    `timeseries`
    WHERE
    k = 'default' window bucket AS (
    PARTITION BY FLOOR(
    UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 ))
    ORDER BY
    dt
    )
    ```

    * filtering intermediate rows and keeping one row per bucket
    ```sql
    WITH windowed_data AS (
    SELECT
    k,
    v,
    datapoint,
    UNIX_TIMESTAMP( dt ) AS last_timestamp,
    first_value( UNIX_TIMESTAMP( dt ) ) over bucket AS first_timestamp,
    FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) AS bucket,-- 6 hour chart of 60 points on chart
    COUNT(*) over bucket AS count_over_bucket,
    AVG( v ) over bucket AS avg_v_over_bucket,
    MAX( v ) over bucket AS max_v_over_bucket,
    MIN( v ) over bucket AS min_v_over_bucket,
    IF
    ( lead( k, 1 ) over bucket IS NULL, 1, 0 ) AS final_row_in_bucket -- one key forward within the window for current partition

    FROM
    `timeseries`
    WHERE
    k = 'default' window bucket AS ( PARTITION BY FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) ORDER BY dt )
    ) SELECT
    *
    FROM
    windowed_data
    WHERE
    final_row_in_bucket = 1
    ```

    * for all of the keys, last 6 hours data to speed up things
    ```sql
    WITH windowed_data AS (
    SELECT
    '6hr_60pt' as `type`,
    k,
    v,
    datapoint,
    UNIX_TIMESTAMP( dt ) AS last_timestamp,
    first_value( UNIX_TIMESTAMP( dt ) ) over bucket AS first_timestamp,
    FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) AS bucket,-- 6 hour chart of 60 points on chart
    COUNT(*) over bucket AS count_over_bucket,
    AVG( v ) over bucket AS avg_v_over_bucket,
    MAX( v ) over bucket AS max_v_over_bucket,
    MIN( v ) over bucket AS min_v_over_bucket,
    IF
    ( lead( k, 1 ) over bucket IS NULL, 1, 0 ) AS final_row_in_bucket -- one key forward within the window for current partition

    FROM
    `timeseries`
    WHERE
    dt >= NOW() - INTERVAL 6 HOUR -- data from last 6 hours
    window bucket AS ( PARTITION BY k, FLOOR( UNIX_TIMESTAMP( dt ) / ( 6 * 60 * 60 / 60 )) ORDER BY k, dt )
    ) SELECT
    *
    FROM
    windowed_data
    WHERE
    final_row_in_bucket = 1

    ```


    next to speed up, create summary for current computation, at run time calculate last remaining bucket

    insert into timeseries_rollup ( select * from windowed_date )

    have compount index over type, key and bucket, on collition update count/avg/min/max, select directly from rollup table,
    we could drop INTERVAL from 6 hours to 15 minites and keep updating rollup table while calulating timeseries in backgorund

    ```js

    let booking_time_series = 'time_series_booking'

    exports.up = async knex => {
    await knex.schema.createTable(booking_time_series, table => {
    table.increments("id").unsigned().primary();

    table.string('k', 255).collate('ascii_bin').notNullable().defaultTo('default')

    table.string('v', 255).collate('ascii_bin').notNullable().defaultTo('')

    table.integer("datapoint").notNullable().defaultTo(1)

    table.datetime('dt').notNullable().defaultTo(knex.fn.now())
    })

    await knex.schema.table(booking_time_series, table => {
    table.index(['dt', 'k', 'v', 'datapoint'])
    })
    }

    exports.down = async knex => {
    await knex.schema.dropTable(booking_time_series)
    }
    ```