Skip to content

Instantly share code, notes, and snippets.

@kokizzu
Last active October 4, 2025 15:17
Show Gist options
  • Save kokizzu/50f564c23894a9c8abded6c09b3ae1eb to your computer and use it in GitHub Desktop.
Save kokizzu/50f564c23894a9c8abded6c09b3ae1eb to your computer and use it in GitHub Desktop.

Revisions

  1. kokizzu renamed this gist Oct 3, 2025. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. kokizzu revised this gist Oct 3, 2025. 1 changed file with 37 additions and 0 deletions.
    37 changes: 37 additions & 0 deletions 003_004.sql
    Original file line number Diff line number Diff line change
    @@ -1390,6 +1390,43 @@ WHERE toYYYYMM(timestamp) = '202508'
    ;


    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized
    WHERE toYYYYMM(timestamp) = '202509'
    ;


    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
  3. kokizzu revised this gist Oct 3, 2025. 1 changed file with 820 additions and 1 deletion.
    821 changes: 820 additions & 1 deletion 003_004.sql
    Original file line number Diff line number Diff line change
    @@ -864,4 +864,823 @@ SELECT 'best_pool_resolved_by_qmint rows:', count() FROM indexer.best_pool_resol
    /* ---------- Migration bookkeeping ---------- */

    INSERT INTO system_migrations (name)
    VALUES ('004_good_pools_and_metrics');
    VALUES ('004_good_pools_and_metrics');

    -- from 005_split_per_date.sql
    ------------------------------




    -- 1. Create the target table for MV
    CREATE TABLE IF NOT EXISTS indexer.k_parsed_transactions_by_quote
    (
    signature String,
    slot UInt64,
    timestamp DateTime64(9),
    method LowCardinality(String),
    platform LowCardinality(String),
    wallet FixedString(32),
    quote_mint FixedString(32),
    base_mint FixedString(32),
    pool String,
    base_amount Float64,
    quote_amount Float64,
    usd_amount Float64,
    price_base Float64,
    price_usd Float64,
    quote_decimals UInt8,
    base_decimals UInt8,
    pre_quote_balance Float64,
    post_quote_balance Float64,
    pre_base_balance Float64,
    post_base_balance Float64,
    base_liquidity Float64,
    quote_liquidity Float64,
    base_liquidity_real Float64,
    quote_liquidity_real Float64,
    transaction_index UInt32,
    instruction_index UInt32,
    inner_instruction_index UInt32,
    tx_fees Float64,
    platform_fees Float64,
    processor_tips Float64,
    platform_type LowCardinality(String)
    )
    ENGINE = MergeTree
    ORDER BY (quote_mint, wallet, method, slot, transaction_index, instruction_index, inner_instruction_index);

    -- 2. Create MV that auto-populates into the target
    CREATE MATERIALIZED VIEW IF NOT EXISTS indexer.k_mv_parsed_transactions_by_quote
    TO indexer.k_parsed_transactions_by_quote
    AS
    SELECT
    signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized;

    -- 3. backfill per yyyymm
    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized
    WHERE toYYYYMM(timestamp) = '202409'
    ;

    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized
    WHERE toYYYYMM(timestamp) = '202410'
    ;

    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized
    WHERE toYYYYMM(timestamp) = '202411'
    ;


    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized
    WHERE toYYYYMM(timestamp) = '202412'
    ;


    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized
    WHERE toYYYYMM(timestamp) = '202501'
    ;

    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized
    WHERE toYYYYMM(timestamp) = '202502'
    ;


    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized
    WHERE toYYYYMM(timestamp) = '202503'
    ;


    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized
    WHERE toYYYYMM(timestamp) = '202504'
    ;


    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized
    WHERE toYYYYMM(timestamp) = '202505'
    ;


    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized
    WHERE toYYYYMM(timestamp) = '202506'
    ;


    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized
    WHERE toYYYYMM(timestamp) = '202507'
    ;


    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized
    WHERE toYYYYMM(timestamp) = '202508'
    ;


    INSERT INTO indexer.k_parsed_transactions_by_quote
    SELECT signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.parsed_transactions_optimized
    WHERE toYYYYMM(timestamp) = '202509'
    ;

    -- 4. check whether it's getting faster or not

    EXPLAIN indexes = 1
    SELECT
    signature,
    slot,
    timestamp,
    method,
    platform,
    wallet,
    quote_mint,
    base_mint,
    pool,
    base_amount,
    quote_amount,
    usd_amount,
    price_base,
    price_usd,
    quote_decimals,
    base_decimals,
    pre_quote_balance,
    post_quote_balance,
    pre_base_balance,
    post_base_balance,
    base_liquidity,
    quote_liquidity,
    base_liquidity_real,
    quote_liquidity_real,
    transaction_index,
    instruction_index,
    inner_instruction_index,
    tx_fees,
    platform_fees,
    processor_tips,
    platform_type
    FROM indexer.k_parsed_transactions_by_quote
    WHERE wallet IN (
    SELECT toFixedString(base58Decode(w), 32)
    FROM (SELECT arrayJoin([
    '4GQeEya6ZTwvXre4Br6ZfDyfe2WQMkcDz2QbkJZazVqS',
    'cook3vQojECt9LgMUse6Y3PLDzzUmdVGt1Uz8sohGai',
    '86ca1fHVACfW57vQaGwvx2841WnpphuT5YF3WoHxuouo',
    'H3jfDkDCeBgzM5wm7WCwtjdMC48poTAKXbEeWdvKLnVL',
    '3tc4BVAdzjr1JpeZu6NAjLHyp4kK3iic7TexMBYGJ4Xk',
    'RFSqPtn1JfavGiUD4HJsZyYXvZsycxf31hnYfbyG6iB',
    '8deJ9xeUvXSJwicYptA9mHsU2rN2pDx37KWzkDkEXhU6',
    'DfMxre4cKmvogbLrPigxmibVTTQDuzjdXojWzjCXXhzj',
    /* ⚠️ The next two end with "pump" and are not valid Base58 pubkeys.
    Remove or replace them, or base58Decode will throw. */
    -- '34qXg7fx9AJDmBA1fRs8xMsfNvWQbabkdkEWfGSRpump',
    '8zFZHuSRuDpuAR7J6FzwyF3vKNx4CVW3DFHJerQhc7Zd',
    'FpR5RsXauqdsGhgdW9C6zmBHMkcT9HJvXJQDhSUa4kYe',
    '3kebnKw7cPdSkLRfiMEALyZJGZ4wdiSRvmoN4rD1yPzV',
    '4Be9CvxqHW6BYiRAxW9Q3xu1ycTMWaL5z8NX4HR3ha7t',
    '9jyqFiLnruggwNn4EQwBNFXwpbLM9hrA4hV59ytyAVVz',
    '3rSZJHysEk2ueFVovRLtZ8LGnQBMZGg96H2Q4jErspAF',
    'CRVidEDtEUTYZisCxBZkpELzhQc9eauMLR3FWg74tReL',
    '73LnJ7G9ffBDjEBGgJDdgvLUhD5APLonKrNiHsKDCw5B',
    '91HANKimRsWTXMkfcDipLaDVmeTkj6m2eNSVobpkdUNz',
    'GQWLRHtR18vy8myoHkgc9SMcSzwUdBjJ816vehSBwcis',
    '62FZUSWPMX9pofoV1uWHMdzFJRjwMa1LHgh2zhdEB7Zj',
    'AVAZvHLR2PcWpDf8BXY4rVxNHYRBytycHkcB5z5QNXYm',
    '73KcgcTVyLYhnVX3PJbit8e4mC7jU8QotNGBFGix14jn',
    'BDC19MFbzDpoF37QCxe4JR6i7UmXNMSiEqrxuQgJoWte',
    '4vgDxDJubQxmHsaNxgsTESer9Qgk1bGKuE1dN4fXzsoj',
    '26kZ9rg8Y5pd4j1tdT4cbT8BQRu5uDbXkaVs3L5QasHy',
    '5TMR5GFPTzkNhprnty2ocXEFLoLur1uchg4NkEr5WEf4',
    'G2VzymsKt3zNAn4CKBndYcS67w6Kny5sDEp7Y2W1aTf6',
    'GfXQesPe3Zuwg8JhAt6Cg8euJDTVx751enp9EQQmhzPH',
    'dVs7zZksjFuq73xbtUC62brFXYYuxCuPSG4wZeGiHck',
    'DGPYpCdiVg2shab2TnNiZ2RnsjBQSmhgN71hJyWC5cYn',
    'GTvBQnRvAPweU2qmYg8MDLND2PAAyYFKe35aKQGMRDaL',
    'EcJWNtETrzdbj8s2dXpaE4Tu4r7fxALD6TNw9H8S6ksz',
    '7nt58K2KF8HwBiCcRprCYP7civVifXP59fE4x229gAuv',
    '2QwCvtKyVew25mvDDJDwgopMhwRmNQqqGDsjCBVACo8A',
    '4aDdi3EiDPMbeZ3e5BvbFMt4vfJaoahaHxZuwKQRtFc1',
    'Hg5SEgwdHw8FUoKMPDZbREii3qEXwn6EvvsUdBPM2rmi',
    '7Dt5oUpxHWuKH8bCTXDLz2j3JyxA7jEmtzqCG6pnh96X',
    'AbcX4XBm7DJ3i9p29i6sU8WLmiW4FWY5tiwB9D6UBbcE',
    'HrTZPWV4ZPebBiwyzoTBajCD49kQqVwf4dwsLuYG8CXX',
    '4hSXPtxZgXFpo6Vxq9yqxNjcBoqWN3VoaPJWonUtupzD',
    'BrNoqdHUCcv9yTncnZeSjSov8kqhpmzv1nAiPbq1M95H',
    '28ipXVfkdmu1PDowCHbcSfzkpH9edZmSiVoDhY5xGVfR',
    'HsfSzb7BBv4oKNFamgoyBEQ4v6BHUhEREEmBnWzZJXLj',
    'ECCKBDWX3MkEcf3bULbLBb9FvrEQLsmPMFTKFpvjzqgP',
    'EDmbDc7sY87dKszqyZ3rHczWbKcCyUvJQSJpE3Cg4RcZ',
    'DSVc1Rd69sLcXBoZAsvkzK4jGFyJKP77K3J2CaFVNAFP',
    'HEibUTHW7JHt7VgKYtWpQCSX7a6YHBmRQrixZs9i4wGY',
    'CAUbSmiNuj16phNiskMdwWZEAUXCfXaUSamDFyf7pAa6',
    '6RoLbZJWJHpTk4sdPsWzocEHiRtzPS36WcBjnMXuQrfU',
    'JDd3hy3gQn2V982mi1zqhNqUw1GfV2UL6g76STojCJPN',
    '9UWZFoiCHeYRLmzmDJhdMrP7wgrTw7DMSpPiT2eHgJHe',
    'EXEeLF1YiZxC1tFneoQPv6rcdq5VMUbKNGy3DLPEo2oH',
    'D3Z7weHeLGWA7eg1qwVB66NCs8YLxiDTBVE6Eb1tTmwg',
    'HVajxfNTWqLGxsfJA9DFThnvddveKfJLK8re1kNpeCVv',
    '3PgV4tvihn16J6uaQsKeekChCAH7MKyxGQYb7eaRJbMu',
    'AT6FsbUy1jNR2AHE35WCVh3nSdDtYADtdhA2KitQXmL5',
    'HdxkiXqeN6qpK2YbG51W23QSWj3Yygc1eEk2zwmKJExp',
    'Fxj1j5ohVoRos1yQnvx6bsvYCt493VGpyLbMiq4eLff3',
    '7VBTpiiEjkwRbRGHJFUz6o5fWuhPFtAmy8JGhNqwHNnn',
    '6LChaYRYtEYjLEHhzo4HdEmgNwu2aia8CM8VhR9wn6n7',
    'DNfuF1L62WWyW3pNakVkyGGFzVVhj4Yr52jSmdTyeBHm',
    '636N7frU8bUwYfyUAtvMQQsXhTFRuSWjxnEZihr5axGV',
    'DpNVrtA3ERfKzX4F8Pi2CVykdJJjoNxyY5QgoytAwD26',
    'f1BuDDhr9myYFm8X49sQ5RharwsoNPvXDtB18CMA1fu',
    'HABhDh9zrzf8mA4SBo1yro8M6AirH2hZdLNPpuvMH6iA',
    '9ecRdqNCBxwReiY2pxtnLeayeBzE1CrSSmKKJJGdmsaJ',
    'F72vY99ihQsYwqEDCfz7igKXA5me6vN2zqVsVUTpw6qL',
    'FAwHi11KyJVh2pR2b2vNFEbbunTVkMFpdNBuD2dh9NRf',
    'HmBmSYwYEgEZuBUYuDs9xofyqBAkw4ywugB1d7R7sTGh',
    '7iabBMwmSvS4CFPcjW2XYZY53bUCHzXjCFEFhxeYP4CY',
    'AJ6MGExeK7FXmeKkKPmALjcdXVStXYokYNv9uVfDRtvo',
    'CyaE1VxvBrahnPWkqm5VsdCvyS2QmNht2UFrKJHga54o',
    'BCnqsPEtA1TkgednYEebRpkmwFRJDCjMQcKZMMtEdArc',
    '7Ny3AmDQ1cnQ29ywXdQheaoeTPvo6NGkDDtGa5bK9wRH',
    /* ⚠️ Same here — ends with "pump". Remove or replace. */
    -- '36Qg5TussomUkM7g9FUR8KNcUNwDPbL8jdurKvw3pump',
    'suqh5sHtr8HyJ7q8scBimULPkPpA557prMG47xCHQfK',
    '4zq1iLpmepj2Rj7W6A3XQMRQA1HyjYqVpZiBzM6aPyH7',
    '4gbKBZQ8aaXfmuwq2bbN6cFJEhLm1uxHv2ravrkYTgBU',
    'o5e4MABwiMuhnwXWUPgwnhvGPK91i2ytnWZvAz2begi',
    '7SDs3PjT2mswKQ7Zo4FTucn9gJdtuW4jaacPA65BseHS',
    '77kq9xNZg4Vb94UJMpeHsx7pKXUBU3Wpqsm5UXhjA9dd',
    '831qmkeGhfL8YpcXuhrug6nHj1YdK3aXMDQUCo85Auh1'
    ]) AS w)
    )
    AND quote_mint = toFixedString(base58Decode('MEFNBXixkEbait3xn9bkm8WsJzXtVsaJEn4c8Sam21u'), 32)
    AND method IN ('buy', 'sell')
    ORDER BY (slot, transaction_index, instruction_index, inner_instruction_index) DESC
    LIMIT 1000 OFFSET 0
    SETTINGS optimize_read_in_order=0, enable_filesystem_cache=0;


    -- to undo
    /*
    DROP VIEW indexer.k_mv_parsed_transactions_by_quote;
    DROP TABLE indexer.k_parsed_transactions_by_quote
    SETTINGS max_table_size_to_drop = 0;
    */

    INSERT INTO system_migrations (name)
    VALUES ('005_wallet_quote_mint');

    -- from 006_various_changes.sql
    -------------------------------

    /* ===============================
    MIGRATION 006 — Various changes
    =============================== */

    USE indexer;

    SET max_memory_usage = 480000000000,
    max_execution_time = 0,
    send_timeout = 0,
    receive_timeout = 0,
    optimize_on_insert = 0,
    max_rows_to_read = 0,
    max_bytes_to_read = 0,
    max_threads = 56,
    max_bytes_before_external_group_by = '100G',
    group_by_two_level_threshold = 500000,
    group_by_two_level_threshold_bytes = '8G',
    max_bytes_before_external_sort = '100G',
    allow_nondeterministic_mutations = 1,
    max_table_size_to_drop = 0;





    DROP TABLE IF EXISTS indexer.mv_trade_volume_hourly_by_mint_mv;
    DROP TABLE IF EXISTS indexer.mv_trade_volume_hourly_by_mint;

    ALTER TABLE indexer.parsed_new_token_transactions_optimized
    DROP PROJECTION IF EXISTS uri_search_projection;




    /* ---------- Trading Volume Hourly Materialized View ---------- */

    CREATE TABLE indexer.mv_trade_volume_hourly_by_mint
    (
    `mint` FixedString(32),
    `hour` DateTime('UTC'),
    `total_volume` AggregateFunction(sum, Float64),
    `buy_volume` AggregateFunction(sum, Float64),
    `sell_volume` AggregateFunction(sum, Float64),
    `total_txns` AggregateFunction(count),
    `buy_txns` AggregateFunction(count),
    `sell_txns` AggregateFunction(count),

    PROJECTION hour_projection
    (
    SELECT *
    ORDER BY hour
    ),
    PROJECTION mint_hour_projection
    (
    SELECT *
    ORDER BY mint, hour
    )
    )
    ENGINE = AggregatingMergeTree
    PARTITION BY toYYYYMM(hour)
    ORDER BY (mint, hour)
    SETTINGS index_granularity = 8192,
    deduplicate_merge_projection_mode = 'rebuild';

    CREATE MATERIALIZED VIEW indexer.mv_trade_volume_hourly_by_mint_mv
    TO indexer.mv_trade_volume_hourly_by_mint
    AS SELECT
    quote_mint AS mint,
    toStartOfHour(timestamp) AS hour,
    sumState(usd_amount) AS total_volume,
    sumStateIf(usd_amount, method = 'buy') AS buy_volume,
    sumStateIf(usd_amount, method = 'sell') AS sell_volume,
    countState() AS total_txns,
    countStateIf(method = 'buy') AS buy_txns,
    countStateIf(method = 'sell') AS sell_txns
    FROM indexer.parsed_transactions_optimized
    WHERE is_swap = 1
    GROUP BY mint, hour;



    /* ---------- URI Search Projection ---------- */
    ALTER TABLE indexer.parsed_new_token_transactions_optimized
    ADD PROJECTION uri_search_projection
    (
    SELECT
    *
    ORDER BY uri, timestamp
    );

    ALTER TABLE indexer.parsed_new_token_transactions_optimized
    MATERIALIZE PROJECTION uri_search_projection;


    INSERT INTO indexer.mv_trade_volume_hourly_by_mint
    SELECT
    quote_mint AS mint,
    toStartOfHour(timestamp) AS hour,
    sumState(usd_amount) AS total_volume,
    sumStateIf(usd_amount, method = 'buy') AS buy_volume,
    sumStateIf(usd_amount, method = 'sell') AS sell_volume,
    countState() AS total_txns,
    countStateIf(method = 'buy') AS buy_txns,
    countStateIf(method = 'sell') AS sell_txns
    FROM indexer.parsed_transactions_optimized
    WHERE is_swap = 1
    GROUP BY mint, hour;

    /* ---------- Migration bookkeeping ---------- */

    INSERT INTO system_migrations (name)
    VALUES ('006_various_changes');
  4. kokizzu revised this gist Oct 3, 2025. 1 changed file with 99 additions and 0 deletions.
    99 changes: 99 additions & 0 deletions 003_004.sql
    Original file line number Diff line number Diff line change
    @@ -766,3 +766,102 @@ GROUP BY
    if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.1, CAST(NULL AS Nullable(FixedString(32)))),
    if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.2, CAST(NULL AS Nullable(String))),
    if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.3, CAST(NULL AS Nullable(String)));


    /* ---------- Backfill best_pool_resolved_by_qmint (with embedded metrics) ---------- */

    INSERT INTO indexer.best_pool_resolved_by_qmint
    WITH
    toFixedString('', 32) AS empty32,
    (
    empty32,
    CAST('' AS LowCardinality(String)),
    CAST('' AS LowCardinality(String))
    ) AS empty_tuple
    SELECT
    q.quote_mint,

    /* identity columns for the primary (resolved) pool */
    primary_tuple.1 AS pool,
    primary_tuple.2 AS platform,
    primary_tuple.3 AS launchpad,
    CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool) AS is_launchpad,
    CAST(q.mig IS NOT NULL AS Bool) AS is_migration,

    /* historical (optional) identity */
    if(
    q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1,
    q.lp.1, CAST(NULL AS Nullable(FixedString(32)))
    ) AS historical_pool,
    if(
    q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1,
    q.lp.2, CAST(NULL AS Nullable(String))
    ) AS historical_platform,
    if(
    q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1,
    q.lp.3, CAST(NULL AS Nullable(String))
    ) AS historical_launchpad,

    /* === merged metrics across chosen pools === */
    countIfMergeState(p.buy_count) AS buy_count,
    countIfMergeState(p.sell_count) AS sell_count,
    sumMergeState(p.buy_base_amount) AS buy_base_amount,
    sumMergeState(p.sell_base_amount) AS sell_base_amount,
    sumMergeState(p.buy_usd_amount) AS buy_usd_amount,
    sumMergeState(p.sell_usd_amount) AS sell_usd_amount,
    argMaxMergeState(p.price_base) AS price_base,
    argMaxMergeState(p.price_usd) AS price_usd,
    argMaxMergeState(p.base_liquidity) AS base_liquidity,
    argMaxMergeState(p.quote_liquidity) AS quote_liquidity,
    argMaxMergeState(p.base_liquidity_real) AS base_liquidity_real,
    argMaxMergeState(p.quote_liquidity_real) AS quote_liquidity_real,
    minMergeState(p.first_transaction_time) AS first_transaction_time
    FROM
    (
    /* Resolve the tuple set per quote_mint */
    SELECT
    quote_mint,
    if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple) AS mig,
    if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple) AS lp,
    if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple) AS best,
    coalesce(
    if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple),
    if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple),
    if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple)
    ) AS primary_tuple
    FROM indexer.best_pools_by_qmint_agg
    GROUP BY quote_mint
    ) AS q
    /* expand to the set of chosen pools: primary + (optional) historical lp */
    ARRAY JOIN arrayFilter(x -> x.1 != empty32,
    [ primary_tuple,
    if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp, empty_tuple)
    ]) AS chosen_tuple
    ANY LEFT JOIN indexer.pool_aggregates_optimized AS p
    ON p.pool = chosen_tuple.1
    GROUP BY
    q.quote_mint,
    primary_tuple.1, primary_tuple.2, primary_tuple.3,
    CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool),
    CAST(q.mig IS NOT NULL AS Bool),
    if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.1, CAST(NULL AS Nullable(FixedString(32)))),
    if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.2, CAST(NULL AS Nullable(String))),
    if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.3, CAST(NULL AS Nullable(String)))
    SETTINGS
    max_memory_usage = '120G',
    max_bytes_in_join = '120G',
    join_algorithm = 'partial_merge',
    max_threads = 32;


    /* ---------- Counts for quick sanity ---------- */

    SELECT 'pool_aggregates_optimized rows:', count() FROM indexer.pool_aggregates_optimized;
    SELECT 'best_pools_by_qmint_agg rows:', count() FROM indexer.best_pools_by_qmint_agg;
    SELECT 'best_pool_resolved_by_qmint rows:', count() FROM indexer.best_pool_resolved_by_qmint;


    /* ---------- Migration bookkeeping ---------- */

    INSERT INTO system_migrations (name)
    VALUES ('004_good_pools_and_metrics');
  5. kokizzu revised this gist Oct 3, 2025. 1 changed file with 116 additions and 0 deletions.
    116 changes: 116 additions & 0 deletions 003_004.sql
    Original file line number Diff line number Diff line change
    @@ -650,3 +650,119 @@ GROUP BY quote_mint
    SETTINGS
    max_memory_usage = '80G',
    max_threads = 32;


    /* ==============================================
    CHANGE #3 — Resolve single best (and historical)
    + embed ALL metrics directly here
    ============================================== */

    CREATE TABLE IF NOT EXISTS indexer.best_pool_resolved_by_qmint
    (
    quote_mint FixedString(32),
    pool FixedString(32),
    platform LowCardinality(String),
    launchpad LowCardinality(String),
    is_launchpad Bool,
    is_migration Bool,
    historical_pool Nullable(FixedString(32)),
    historical_platform Nullable(String), -- previously Nullable(LowCardinality(String
    historical_launchpad Nullable(String), -- previously Nullable(LowCardinality(String

    /* === embedded metrics aggregated across the chosen pools (primary + optional historical) === */
    buy_count AggregateFunction(countIf, Bool),
    sell_count AggregateFunction(countIf, Bool),
    buy_base_amount AggregateFunction(sum, UInt64),
    sell_base_amount AggregateFunction(sum, UInt64),
    buy_usd_amount AggregateFunction(sum, Float64),
    sell_usd_amount AggregateFunction(sum, Float64),
    price_base AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)),
    price_usd AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)),
    base_liquidity AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
    quote_liquidity AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
    base_liquidity_real AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
    quote_liquidity_real AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
    first_transaction_time AggregateFunction(min, DateTime('UTC'))
    )
    ENGINE = ReplacingMergeTree
    ORDER BY quote_mint;

    CREATE MATERIALIZED VIEW IF NOT EXISTS indexer.mv_best_pool_resolved_by_qmint
    TO indexer.best_pool_resolved_by_qmint
    AS
    WITH
    toFixedString('', 32) AS empty32,
    (
    empty32,
    CAST('' AS LowCardinality(String)),
    CAST('' AS LowCardinality(String))
    ) AS empty_tuple
    SELECT
    q.quote_mint,

    /* identity columns for the primary (resolved) pool */
    primary_tuple.1 AS pool,
    primary_tuple.2 AS platform,
    primary_tuple.3 AS launchpad,
    CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool) AS is_launchpad,
    CAST(q.mig IS NOT NULL AS Bool) AS is_migration,

    /* historical (optional) identity */
    if(
    q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1,
    q.lp.1, CAST(NULL AS Nullable(FixedString(32)))
    ) AS historical_pool,
    if(
    q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1,
    q.lp.2, CAST(NULL AS Nullable(String))
    ) AS historical_platform,
    if(
    q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1,
    q.lp.3, CAST(NULL AS Nullable(String))
    ) AS historical_launchpad,

    /* === merged metrics across chosen pools === */
    countIfMergeState(p.buy_count) AS buy_count,
    countIfMergeState(p.sell_count) AS sell_count,
    sumMergeState(p.buy_base_amount) AS buy_base_amount,
    sumMergeState(p.sell_base_amount) AS sell_base_amount,
    sumMergeState(p.buy_usd_amount) AS buy_usd_amount,
    sumMergeState(p.sell_usd_amount) AS sell_usd_amount,
    argMaxMergeState(p.price_base) AS price_base,
    argMaxMergeState(p.price_usd) AS price_usd,
    argMaxMergeState(p.base_liquidity) AS base_liquidity,
    argMaxMergeState(p.quote_liquidity) AS quote_liquidity,
    argMaxMergeState(p.base_liquidity_real) AS base_liquidity_real,
    argMaxMergeState(p.quote_liquidity_real) AS quote_liquidity_real,
    minMergeState(p.first_transaction_time) AS first_transaction_time
    FROM
    (
    /* Resolve the tuple set per quote_mint */
    SELECT
    quote_mint,
    if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple) AS mig,
    if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple) AS lp,
    if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple) AS best,
    coalesce(
    if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple),
    if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple),
    if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple)
    ) AS primary_tuple
    FROM indexer.best_pools_by_qmint_agg
    GROUP BY quote_mint
    ) AS q
    /* expand to the set of chosen pools: primary + (optional) historical lp */
    ARRAY JOIN arrayFilter(x -> x.1 != empty32,
    [ primary_tuple,
    if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp, empty_tuple)
    ]) AS chosen_tuple
    ANY LEFT JOIN indexer.pool_aggregates_optimized AS p
    ON p.pool = chosen_tuple.1
    GROUP BY
    q.quote_mint,
    primary_tuple.1, primary_tuple.2, primary_tuple.3,
    CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool),
    CAST(q.mig IS NOT NULL AS Bool),
    if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.1, CAST(NULL AS Nullable(FixedString(32)))),
    if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.2, CAST(NULL AS Nullable(String))),
    if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.3, CAST(NULL AS Nullable(String)));
  6. kokizzu revised this gist Oct 3, 2025. 1 changed file with 87 additions and 0 deletions.
    87 changes: 87 additions & 0 deletions 003_004.sql
    Original file line number Diff line number Diff line change
    @@ -563,3 +563,90 @@ SETTINGS
    group_by_two_level_threshold_bytes = '96M',
    max_bytes_before_external_sort = '16G';


    /* ==============================================
    CHANGE #2 — Best pools per quote_mint (agg)
    ============================================== */

    CREATE TABLE IF NOT EXISTS indexer.best_pools_by_qmint_agg
    (
    quote_mint FixedString(32),

    -- official pools
    migration_state AggregateFunction(any,
    Tuple(FixedString(32), LowCardinality(String), LowCardinality(String))),
    launchpad_state AggregateFunction(any,
    Tuple(FixedString(32), LowCardinality(String), LowCardinality(String))),

    -- best non-official by latest known liquidity (UInt64)
    best_liq_state AggregateFunction(argMax,
    Tuple(FixedString(32), LowCardinality(String), LowCardinality(String)),
    UInt64)
    )
    ENGINE = ReplicatedAggregatingMergeTree(
    '/clickhouse/tables/{shard}/indexer/best_pools_by_qmint_agg', '{replica}'
    )
    ORDER BY quote_mint;

    CREATE MATERIALIZED VIEW IF NOT EXISTS indexer.mv_best_from_snapshot
    TO indexer.best_pools_by_qmint_agg
    AS
    SELECT
    quote_mint,

    -- unique migration pool per mint
    anyStateIf( (pool, platform, launchpad), is_migration = 1 ) AS migration_state,

    -- unique launchpad pool per mint (non-empty launchpad label)
    anyStateIf( (pool, platform, launchpad), launchpad <> '' ) AS launchpad_state,

    -- best non-official by latest liquidity snapshot
    argMaxStateIf(
    (pool, platform, launchpad),
    last_liq,
    (is_migration = 0) AND (launchpad = '')
    ) AS best_liq_state
    FROM
    (
    /* Snapshot per pool (functionally dependent columns via any*; no nested aggs above this level) */
    SELECT
    any(quote_mint) AS quote_mint,
    pool,
    any(platform) AS platform,
    any(launchpad) AS launchpad,

    -- latest known liquidity per pool (UInt64)
    argMaxMerge(quote_liquidity_real) AS last_liq,

    -- ever-true migration flag
    maxMerge(is_migration_state) AS is_migration
    FROM indexer.pool_aggregates_optimized
    GROUP BY pool
    )
    GROUP BY quote_mint;

    /* ---------- Backfill best_pools_by_qmint_agg ---------- */

    INSERT INTO indexer.best_pools_by_qmint_agg
    SELECT
    quote_mint,
    anyStateIf( (pool, platform, launchpad), is_migration = 1 ) AS migration_state,
    anyStateIf( (pool, platform, launchpad), launchpad <> '' ) AS launchpad_state,
    argMaxStateIf( (pool, platform, launchpad), last_liq,
    (is_migration = 0) AND (launchpad = '') ) AS best_liq_state
    FROM
    (
    SELECT
    any(quote_mint) AS quote_mint,
    pool,
    any(platform) AS platform,
    any(launchpad) AS launchpad,
    argMaxMerge(quote_liquidity_real) AS last_liq,
    maxMerge(is_migration_state) AS is_migration
    FROM indexer.pool_aggregates_optimized
    GROUP BY pool
    )
    GROUP BY quote_mint
    SETTINGS
    max_memory_usage = '80G',
    max_threads = 32;
  7. kokizzu renamed this gist Oct 3, 2025. 1 changed file with 208 additions and 1 deletion.
    209 changes: 208 additions & 1 deletion 003.sql → 003_004.sql
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,6 @@
    -- from 003_candles_new_migration.sql
    -------------------------------------

    USE indexer;
    SET max_memory_usage = 0,
    max_execution_time = 0,
    @@ -355,4 +358,208 @@ SELECT
    FROM indexer.ohlcv_agg_slot;

    INSERT INTO system_migrations (name)
    VALUES ('003_candles_new_migration');
    VALUES ('003_candles_new_migration');

    -- from 004_pools_and_metrics__pool_aggregates_optimized2.sql
    -------------------------------------------------------------

    /* ===============================
    MIGRATION 004 — Good Pools & Metrics (metrics embedded in best_pool_resolved_by_qmint)
    =============================== */

    USE indexer;

    SET max_memory_usage = '150G',
    max_execution_time = 0,
    send_timeout = 0,
    receive_timeout = 0,
    optimize_on_insert = 0,
    max_rows_to_read = 0,
    max_bytes_to_read = 0,
    max_threads = 8,
    max_bytes_before_external_group_by = '3G',
    group_by_two_level_threshold = 100000,
    group_by_two_level_threshold_bytes = '128M',
    max_bytes_before_external_sort = '2G',
    allow_nondeterministic_mutations = 1,
    max_table_size_to_drop = 0;

    /* ---------- DROP old objects in dependency order (if exist) ---------- */

    DROP VIEW IF EXISTS indexer.mv_best_pool_metrics_by_qmint;
    DROP TABLE IF EXISTS indexer.best_pool_metrics_by_qmint;

    DROP VIEW IF EXISTS indexer.mv_best_pool_resolved_by_qmint;
    DROP TABLE IF EXISTS indexer.best_pool_resolved_by_qmint;

    DROP VIEW IF EXISTS indexer.mv_best_from_snapshot;
    DROP TABLE IF EXISTS indexer.best_pools_by_qmint_agg;

    DROP VIEW IF EXISTS indexer.mv_pool_aggregates_optimized;
    DROP TABLE IF EXISTS indexer.pool_aggregates_optimized;

    DROP VIEW IF EXISTS indexer.mv_pool_aggregates_optimized2;
    DROP TABLE IF EXISTS indexer.pool_aggregates_optimized2;

    /* ==============================================
    CHANGE #1 — Pool aggregates (optimized)
    ============================================== */

    CREATE TABLE indexer.pool_aggregates_optimized
    (
    /* functionally dependent on pool */
    `base_mint` FixedString(32),
    `quote_mint` FixedString(32),
    `pool` FixedString(32),

    /* labels */
    `platform` LowCardinality(String),
    `launchpad` LowCardinality(String),

    /* ever-true flags (logical OR over history) */
    `is_migration_state` AggregateFunction(max, Bool),

    /* counters & sums */
    `buy_count` AggregateFunction(countIf, Bool),
    `sell_count` AggregateFunction(countIf, Bool),
    `buy_base_amount` AggregateFunction(sum, UInt64),
    `sell_base_amount` AggregateFunction(sum, UInt64),
    `buy_usd_amount` AggregateFunction(sum, Float64),
    `sell_usd_amount` AggregateFunction(sum, Float64),

    /* recency-weighted by position tuple */
    `price_base` AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)),
    `price_usd` AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)),
    `base_liquidity` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
    `quote_liquidity` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
    `base_liquidity_real` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),
    `quote_liquidity_real` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)),

    /* earliest position */
    `first_transaction_time` AggregateFunction(min, DateTime('UTC')),

    /* order-only projection */
    PROJECTION quote_mint_projection
    (
    SELECT *
    ORDER BY quote_mint, pool
    )
    )
    ENGINE = ReplicatedAggregatingMergeTree(
    '/clickhouse/tables/{shard}/indexer/pool_aggregates_optimized_new', '{replica}'
    )
    ORDER BY pool
    SETTINGS index_granularity = 2048,
    deduplicate_merge_projection_mode = 'drop';

    CREATE MATERIALIZED VIEW indexer.mv_pool_aggregates_optimized
    TO indexer.pool_aggregates_optimized
    AS
    SELECT
    /* states / labels via any() because we GROUP BY pool */
    any(base_mint) AS base_mint,
    any(quote_mint) AS quote_mint,
    pool,
    any(platform) AS platform,
    any(launchpad) AS launchpad,

    /* ever-true flags */
    maxState(is_migration) AS is_migration_state,

    /* measures */
    countIfState(method = 'buy') AS buy_count,
    countIfState(method = 'sell') AS sell_count,
    sumIfState(base_amount, method = 'buy') AS buy_base_amount,
    sumIfState(base_amount, method = 'sell') AS sell_base_amount,
    sumIfState(usd_amount, method = 'buy') AS buy_usd_amount,
    sumIfState(usd_amount, method = 'sell') AS sell_usd_amount,

    /* recency by position tuple (slot, tx_idx, ix_idx, inner_ix_idx)
    Apply the ≥100 base/quote thresholds and require swaps for prices. */
    argMaxStateIf(
    price_base,
    (slot, transaction_index, instruction_index, inner_instruction_index),
    (is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100)
    ) AS price_base,
    argMaxStateIf(
    price_usd,
    (slot, transaction_index, instruction_index, inner_instruction_index),
    (is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100)
    ) AS price_usd,

    /* liquidity snapshots by recency (no additional filter) */
    argMaxState(
    base_liquidity,
    (slot, transaction_index, instruction_index, inner_instruction_index)
    ) AS base_liquidity,
    argMaxState(
    quote_liquidity,
    (slot, transaction_index, instruction_index, inner_instruction_index)
    ) AS quote_liquidity,
    argMaxState(
    base_liquidity_real,
    (slot, transaction_index, instruction_index, inner_instruction_index)
    ) AS base_liquidity_real,
    argMaxState(
    quote_liquidity_real,
    (slot, transaction_index, instruction_index, inner_instruction_index)
    ) AS quote_liquidity_real,

    /* earliest observed ts */
    minState(timestamp) AS first_transaction_time
    FROM indexer.parsed_transactions_optimized
    GROUP BY pool;

    /* ---------- Backfill pool_aggregates_optimized ---------- */

    INSERT INTO indexer.pool_aggregates_optimized
    SELECT
    any(base_mint) AS base_mint,
    any(quote_mint) AS quote_mint,
    pool,
    any(platform) AS platform,
    any(launchpad) AS launchpad,
    maxState(is_migration) AS is_migration_state,
    countIfState(method = 'buy') AS buy_count,
    countIfState(method = 'sell') AS sell_count,
    sumIfState(base_amount, method = 'buy') AS buy_base_amount,
    sumIfState(base_amount, method = 'sell') AS sell_base_amount,
    sumIfState(usd_amount, method = 'buy') AS buy_usd_amount,
    sumIfState(usd_amount, method = 'sell') AS sell_usd_amount,
    argMaxStateIf(
    price_base,
    (slot, transaction_index, instruction_index, inner_instruction_index),
    (is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100)
    ) AS price_base,
    argMaxStateIf(
    price_usd,
    (slot, transaction_index, instruction_index, inner_instruction_index),
    (is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100)
    ) AS price_usd,
    argMaxState(
    base_liquidity,
    (slot, transaction_index, instruction_index, inner_instruction_index)
    ) AS base_liquidity,
    argMaxState(
    quote_liquidity,
    (slot, transaction_index, instruction_index, inner_instruction_index)
    ) AS quote_liquidity,
    argMaxState(
    base_liquidity_real,
    (slot, transaction_index, instruction_index, inner_instruction_index)
    ) AS base_liquidity_real,
    argMaxState(
    quote_liquidity_real,
    (slot, transaction_index, instruction_index, inner_instruction_index)
    ) AS quote_liquidity_real,
    minState(timestamp) AS first_transaction_time
    FROM indexer.parsed_transactions_optimized
    GROUP BY pool
    SETTINGS
    max_memory_usage = '120G',
    max_threads = 16,
    max_bytes_before_external_group_by = '16G',
    group_by_two_level_threshold = 80000,
    group_by_two_level_threshold_bytes = '96M',
    max_bytes_before_external_sort = '16G';

  8. kokizzu created this gist Oct 3, 2025.
    358 changes: 358 additions & 0 deletions 003.sql
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,358 @@
    USE indexer;
    SET max_memory_usage = 0,
    max_execution_time = 0,
    send_timeout = 0,
    receive_timeout = 0,
    optimize_on_insert = 0,
    max_rows_to_read = 0,
    max_bytes_to_read = 0,
    max_threads = 8,
    max_bytes_before_external_group_by = '3G',
    group_by_two_level_threshold = 100000,
    group_by_two_level_threshold_bytes = '128M',
    max_bytes_before_external_sort = '2G',
    allow_nondeterministic_mutations = 1,
    max_table_size_to_drop = 0;
    DROP TABLE IF EXISTS indexer.mv_ohlcv_agg_slot;
    DROP TABLE IF EXISTS indexer.ohlcv_agg_slot;
    DROP TABLE IF EXISTS indexer.mv_mev_sandwich_slots;
    DROP TABLE IF EXISTS indexer.mev_sandwich_slots;

    CREATE TABLE indexer.ohlcv_agg_slot
    (
    quote_mint FixedString(32),
    pool FixedString(32),
    is_launchpad UInt8,
    bucket UInt64,
    bucket_ts DateTime,
    open_state_usd AggregateFunction(argMin, Float64, Tuple(
    UInt32,
    Int8,
    Int8)),
    high_state_usd AggregateFunction(max, Float64),
    low_state_usd AggregateFunction(min, Float64),
    close_state_usd AggregateFunction(argMax, Float64, Tuple(
    UInt32,
    Int8,
    Int8)),
    vol_state_usd AggregateFunction(sumIf, Float64, UInt8),
    open_state_base AggregateFunction(argMin, Float64, Tuple(
    UInt32,
    Int8,
    Int8)),
    high_state_base AggregateFunction(max, Float64),
    low_state_base AggregateFunction(min, Float64),
    close_state_base AggregateFunction(argMax, Float64, Tuple(
    UInt32,
    Int8,
    Int8)),
    vol_state_base AggregateFunction(sumIf, UInt64, UInt8)
    )
    ENGINE = AggregatingMergeTree
    ORDER BY (quote_mint, bucket, pool, is_launchpad);

    CREATE MATERIALIZED VIEW indexer.mv_ohlcv_agg_slot TO indexer.ohlcv_agg_slot
    (
    quote_mint FixedString(32),
    pool FixedString(32),
    is_launchpad UInt8,
    bucket UInt64,
    bucket_ts DateTime('UTC'),
    open_state_usd AggregateFunction(argMin, Float64, Tuple(
    UInt32,
    Int8,
    Int8)),
    high_state_usd AggregateFunction(max, Float64),
    low_state_usd AggregateFunction(min, Float64),
    close_state_usd AggregateFunction(argMax, Float64, Tuple(
    UInt32,
    Int8,
    Int8)),
    vol_state_usd AggregateFunction(sumIf, Float64, UInt8),
    open_state_base AggregateFunction(argMin, Float64, Tuple(
    UInt32,
    Int8,
    Int8)),
    high_state_base AggregateFunction(max, Float64),
    low_state_base AggregateFunction(min, Float64),
    close_state_base AggregateFunction(argMax, Float64, Tuple(
    UInt32,
    Int8,
    Int8)),
    vol_state_base AggregateFunction(sumIf, UInt64, UInt8)
    )
    AS SELECT
    quote_mint,
    pool,
    toUInt8(is_launchpad) AS is_launchpad,
    slot AS bucket,
    timestamp AS bucket_ts,
    argMinState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_usd,
    maxState(price_usd) AS high_state_usd,
    minState(price_usd) AS low_state_usd,
    argMaxState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_usd,
    sumIfState(usd_amount, is_swap = 1) AS vol_state_usd,
    argMinState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_base,
    maxState(price_base) AS high_state_base,
    minState(price_base) AS low_state_base,
    argMaxState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_base,
    sumIfState(base_amount, is_swap = 1) AS vol_state_base
    FROM indexer.parsed_transactions_optimized
    WHERE ((is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd > 0) AND (price_base > 0)) OR ((is_launchpad = 1) AND (is_swap = 0))
    GROUP BY
    quote_mint,
    pool,
    is_launchpad,
    bucket,
    bucket_ts;


    CREATE TABLE indexer.mev_sandwich_slots
    (
    quote_mint FixedString(32),
    slot UInt64
    )
    ENGINE = MergeTree
    ORDER BY (quote_mint, slot);

    CREATE MATERIALIZED VIEW indexer.mv_mev_sandwich_slots TO indexer.mev_sandwich_slots
    (
    quote_mint FixedString(32),
    slot UInt64
    )
    AS SELECT
    quote_mint,
    slot
    FROM
    (
    SELECT
    quote_mint,
    slot,
    countIf((method = 'buy') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS buys,
    countIf((method = 'sell') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS sells,
    minIf(price_base, method = 'buy') AS min_buy,
    maxIf(price_base, method = 'sell') AS max_sell
    FROM indexer.parsed_transactions_optimized
    GROUP BY
    quote_mint,
    slot,
    wallet
    HAVING (buys > 0) AND (sells > 0) AND (min_buy < max_sell)
    )
    GROUP BY
    quote_mint,
    slot;


    INSERT INTO indexer.ohlcv_agg_slot
    SELECT
    quote_mint,
    pool,
    toUInt8(is_launchpad) AS is_launchpad,
    slot AS bucket,
    timestamp AS bucket_ts,
    argMinState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_usd,
    maxState(price_usd) AS high_state_usd,
    minState(price_usd) AS low_state_usd,
    argMaxState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_usd,
    sumIfState(usd_amount, is_swap = 1) AS vol_state_usd,
    argMinState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_base,
    maxState(price_base) AS high_state_base,
    minState(price_base) AS low_state_base,
    argMaxState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_base,
    sumIfState(base_amount, is_swap = 1) AS vol_state_base
    FROM indexer.parsed_transactions_optimized
    WHERE ((is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd > 0) AND (price_base > 0)) OR ((is_launchpad = 1) AND (is_swap = 0))
    GROUP BY
    quote_mint,
    pool,
    is_launchpad,
    bucket,
    bucket_ts
    SETTINGS
    max_memory_usage = '120G',
    max_threads = '60',
    max_bytes_before_external_group_by = '16G',
    group_by_two_level_threshold = 500000,
    group_by_two_level_threshold_bytes = '640M',
    max_bytes_before_external_sort = '16G';




    -- Break down the complex query into steps to reduce memory usage
    CREATE TEMPORARY TABLE temp_wallet_sandwich_candidates AS
    SELECT
    quote_mint,
    slot,
    wallet,
    countIf((method = 'buy') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS buys,
    countIf((method = 'sell') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS sells,
    minIf(price_base, method = 'buy') AS min_buy,
    maxIf(price_base, method = 'sell') AS max_sell
    FROM indexer.parsed_transactions_optimized
    GROUP BY
    quote_mint,
    slot,
    wallet
    HAVING (buys > 0) AND (sells > 0) AND (min_buy < max_sell)
    SETTINGS
    max_memory_usage = 0,
    max_threads = 6,
    max_bytes_before_external_group_by = '16G',
    group_by_two_level_threshold = 50000,
    group_by_two_level_threshold_bytes = '64M',
    max_bytes_before_external_sort = '16G',
    join_algorithm = 'hash',
    max_rows_in_join = 10000000;

    INSERT INTO indexer.mev_sandwich_slots
    SELECT
    quote_mint,
    slot
    FROM temp_wallet_sandwich_candidates
    GROUP BY
    quote_mint,
    slot
    SETTINGS
    max_memory_usage = 0,
    max_threads = 6,
    max_bytes_before_external_group_by = '16G',
    group_by_two_level_threshold = 250000,
    group_by_two_level_threshold_bytes = '320M',
    max_bytes_before_external_sort = '16G';

    DROP TABLE temp_wallet_sandwich_candidates;

    SELECT 'OHLCV rows inserted:', count() FROM indexer.ohlcv_agg_slot;
    SELECT 'MEV rows inserted:', count() FROM indexer.mev_sandwich_slots;

    DROP TABLE IF EXISTS indexer.mv_trade_agg_qm_wallet;

    ALTER TABLE indexer.trade_agg_qm_wallet
    ADD COLUMN buy_usd_amount AggregateFunction(sum, Float64),
    ADD COLUMN sell_usd_amount AggregateFunction(sum, Float64);

    CREATE MATERIALIZED VIEW indexer.mv_trade_agg_qm_wallet TO indexer.trade_agg_qm_wallet
    (
    quote_mint FixedString(32),
    wallet FixedString(32),
    buy_count AggregateFunction(count),
    sell_count AggregateFunction(count),
    buy_base_amount AggregateFunction(sum, UInt64),
    sell_base_amount AggregateFunction(sum, UInt64),
    buy_quote_amount AggregateFunction(sum, UInt64),
    sell_quote_amount AggregateFunction(sum, UInt64),
    launchpad AggregateFunction(any, String),
    buy_usd_amount AggregateFunction(sum, Float64),
    sell_usd_amount AggregateFunction(sum, Float64)
    )
    AS SELECT
    quote_mint,
    wallet,
    countMergeState(buy_count) AS buy_count,
    countMergeState(sell_count) AS sell_count,
    sumMergeState(buy_base_amount) AS buy_base_amount,
    sumMergeState(sell_base_amount) AS sell_base_amount,
    sumMergeState(buy_quote_amount) AS buy_quote_amount,
    sumMergeState(sell_quote_amount) AS sell_quote_amount,
    anyMergeState(launchpad) AS launchpad,
    sumMergeState(buy_usd_amount) AS buy_usd_amount,
    sumMergeState(sell_usd_amount) AS sell_usd_amount
    FROM indexer.trade_aggregates_optimized
    GROUP BY
    quote_mint,
    wallet;

    TRUNCATE TABLE indexer.trade_agg_qm_wallet SETTINGS max_table_size_to_drop = 0 ;

    INSERT INTO indexer.trade_agg_qm_wallet
    SELECT
    quote_mint,
    wallet,
    countMergeState(buy_count) AS buy_count,
    countMergeState(sell_count) AS sell_count,
    sumMergeState(buy_base_amount) AS buy_base_amount,
    sumMergeState(sell_base_amount) AS sell_base_amount,
    sumMergeState(buy_quote_amount) AS buy_quote_amount,
    sumMergeState(sell_quote_amount) AS sell_quote_amount,
    anyMergeState(launchpad) AS launchpad,
    sumMergeState(buy_usd_amount) AS buy_usd_amount,
    sumMergeState(sell_usd_amount) AS sell_usd_amount
    FROM indexer.trade_aggregates_optimized
    GROUP BY
    quote_mint,
    wallet
    SETTINGS
    max_memory_usage = '120G',
    max_threads = 60,
    max_bytes_before_external_group_by = '16G',
    group_by_two_level_threshold = 50000,
    group_by_two_level_threshold_bytes = '64M',
    max_bytes_before_external_sort = '16G';


    SELECT 'Trade aggregation rows updated:', count() FROM indexer.trade_agg_qm_wallet;

    CREATE TABLE indexer.ohlcv_finalized
    (
    quote_mint FixedString(32),
    pool FixedString(32),
    is_launchpad UInt8,
    bucket UInt64,
    bucket_ts DateTime,
    open_usd Float64,
    high_usd Float64,
    low_usd Float64,
    close_usd Float64,
    vol_usd Float64,
    open_base Float64,
    high_base Float64,
    low_base Float64,
    close_base Float64,
    vol_base UInt64
    )
    ENGINE = MergeTree
    ORDER BY (quote_mint, bucket, pool, is_launchpad);

    CREATE MATERIALIZED VIEW indexer.ohlcv_mv TO indexer.ohlcv_finalized AS
    SELECT
    quote_mint,
    pool,
    is_launchpad,
    bucket,
    bucket_ts,
    finalizeAggregation(open_state_usd) AS open_usd,
    finalizeAggregation(high_state_usd) AS high_usd,
    finalizeAggregation(low_state_usd) AS low_usd,
    finalizeAggregation(close_state_usd) AS close_usd,
    finalizeAggregation(vol_state_usd) AS vol_usd,
    finalizeAggregation(open_state_base) AS open_base,
    finalizeAggregation(high_state_base) AS high_base,
    finalizeAggregation(low_state_base) AS low_base,
    finalizeAggregation(close_state_base) AS close_base,
    finalizeAggregation(vol_state_base) AS vol_base
    FROM indexer.ohlcv_agg_slot;


    INSERT INTO indexer.ohlcv_finalized
    SELECT
    quote_mint,
    pool,
    is_launchpad,
    bucket,
    bucket_ts,
    finalizeAggregation(open_state_usd) AS open_usd,
    finalizeAggregation(high_state_usd) AS high_usd,
    finalizeAggregation(low_state_usd) AS low_usd,
    finalizeAggregation(close_state_usd) AS close_usd,
    finalizeAggregation(vol_state_usd) AS vol_usd,
    finalizeAggregation(open_state_base) AS open_base,
    finalizeAggregation(high_state_base) AS high_base,
    finalizeAggregation(low_state_base) AS low_base,
    finalizeAggregation(close_state_base) AS close_base,
    finalizeAggregation(vol_state_base) AS vol_base
    FROM indexer.ohlcv_agg_slot;

    INSERT INTO system_migrations (name)
    VALUES ('003_candles_new_migration');