Last active
October 4, 2025 15:17
-
-
Save kokizzu/50f564c23894a9c8abded6c09b3ae1eb to your computer and use it in GitHub Desktop.
Revisions
-
kokizzu renamed this gist
Oct 3, 2025 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
kokizzu revised this gist
Oct 3, 2025 . 1 changed file with 37 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1390,6 +1390,43 @@ WHERE toYYYYMM(timestamp) = '202508' ; INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized WHERE toYYYYMM(timestamp) = '202509' ; INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, -
kokizzu revised this gist
Oct 3, 2025 . 1 changed file with 820 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -864,4 +864,823 @@ SELECT 'best_pool_resolved_by_qmint rows:', count() FROM indexer.best_pool_resol /* ---------- Migration bookkeeping ---------- */ INSERT INTO system_migrations (name) VALUES ('004_good_pools_and_metrics'); -- from 005_split_per_date.sql ------------------------------ -- 1. Create the target table for MV CREATE TABLE IF NOT EXISTS indexer.k_parsed_transactions_by_quote ( signature String, slot UInt64, timestamp DateTime64(9), method LowCardinality(String), platform LowCardinality(String), wallet FixedString(32), quote_mint FixedString(32), base_mint FixedString(32), pool String, base_amount Float64, quote_amount Float64, usd_amount Float64, price_base Float64, price_usd Float64, quote_decimals UInt8, base_decimals UInt8, pre_quote_balance Float64, post_quote_balance Float64, pre_base_balance Float64, post_base_balance Float64, base_liquidity Float64, quote_liquidity Float64, base_liquidity_real Float64, quote_liquidity_real Float64, transaction_index UInt32, instruction_index UInt32, inner_instruction_index UInt32, tx_fees Float64, platform_fees Float64, processor_tips Float64, platform_type LowCardinality(String) ) ENGINE = MergeTree ORDER BY (quote_mint, wallet, method, slot, transaction_index, instruction_index, inner_instruction_index); -- 2. Create MV that auto-populates into the target CREATE MATERIALIZED VIEW IF NOT EXISTS indexer.k_mv_parsed_transactions_by_quote TO indexer.k_parsed_transactions_by_quote AS SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized; -- 3. backfill per yyyymm INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized WHERE toYYYYMM(timestamp) = '202409' ; INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized WHERE toYYYYMM(timestamp) = '202410' ; INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized WHERE toYYYYMM(timestamp) = '202411' ; INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized WHERE toYYYYMM(timestamp) = '202412' ; INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized WHERE toYYYYMM(timestamp) = '202501' ; INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized WHERE toYYYYMM(timestamp) = '202502' ; INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized WHERE toYYYYMM(timestamp) = '202503' ; INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized WHERE toYYYYMM(timestamp) = '202504' ; INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized WHERE toYYYYMM(timestamp) = '202505' ; INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized WHERE toYYYYMM(timestamp) = '202506' ; INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized WHERE toYYYYMM(timestamp) = '202507' ; INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized WHERE toYYYYMM(timestamp) = '202508' ; INSERT INTO indexer.k_parsed_transactions_by_quote SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.parsed_transactions_optimized WHERE toYYYYMM(timestamp) = '202509' ; -- 4. check whether it's getting faster or not EXPLAIN indexes = 1 SELECT signature, slot, timestamp, method, platform, wallet, quote_mint, base_mint, pool, base_amount, quote_amount, usd_amount, price_base, price_usd, quote_decimals, base_decimals, pre_quote_balance, post_quote_balance, pre_base_balance, post_base_balance, base_liquidity, quote_liquidity, base_liquidity_real, quote_liquidity_real, transaction_index, instruction_index, inner_instruction_index, tx_fees, platform_fees, processor_tips, platform_type FROM indexer.k_parsed_transactions_by_quote WHERE wallet IN ( SELECT toFixedString(base58Decode(w), 32) FROM (SELECT arrayJoin([ '4GQeEya6ZTwvXre4Br6ZfDyfe2WQMkcDz2QbkJZazVqS', 'cook3vQojECt9LgMUse6Y3PLDzzUmdVGt1Uz8sohGai', '86ca1fHVACfW57vQaGwvx2841WnpphuT5YF3WoHxuouo', 'H3jfDkDCeBgzM5wm7WCwtjdMC48poTAKXbEeWdvKLnVL', '3tc4BVAdzjr1JpeZu6NAjLHyp4kK3iic7TexMBYGJ4Xk', 'RFSqPtn1JfavGiUD4HJsZyYXvZsycxf31hnYfbyG6iB', '8deJ9xeUvXSJwicYptA9mHsU2rN2pDx37KWzkDkEXhU6', 'DfMxre4cKmvogbLrPigxmibVTTQDuzjdXojWzjCXXhzj', /* ⚠️ The next two end with "pump" and are not valid Base58 pubkeys. Remove or replace them, or base58Decode will throw. */ -- '34qXg7fx9AJDmBA1fRs8xMsfNvWQbabkdkEWfGSRpump', '8zFZHuSRuDpuAR7J6FzwyF3vKNx4CVW3DFHJerQhc7Zd', 'FpR5RsXauqdsGhgdW9C6zmBHMkcT9HJvXJQDhSUa4kYe', '3kebnKw7cPdSkLRfiMEALyZJGZ4wdiSRvmoN4rD1yPzV', '4Be9CvxqHW6BYiRAxW9Q3xu1ycTMWaL5z8NX4HR3ha7t', '9jyqFiLnruggwNn4EQwBNFXwpbLM9hrA4hV59ytyAVVz', '3rSZJHysEk2ueFVovRLtZ8LGnQBMZGg96H2Q4jErspAF', 'CRVidEDtEUTYZisCxBZkpELzhQc9eauMLR3FWg74tReL', '73LnJ7G9ffBDjEBGgJDdgvLUhD5APLonKrNiHsKDCw5B', '91HANKimRsWTXMkfcDipLaDVmeTkj6m2eNSVobpkdUNz', 'GQWLRHtR18vy8myoHkgc9SMcSzwUdBjJ816vehSBwcis', '62FZUSWPMX9pofoV1uWHMdzFJRjwMa1LHgh2zhdEB7Zj', 'AVAZvHLR2PcWpDf8BXY4rVxNHYRBytycHkcB5z5QNXYm', '73KcgcTVyLYhnVX3PJbit8e4mC7jU8QotNGBFGix14jn', 'BDC19MFbzDpoF37QCxe4JR6i7UmXNMSiEqrxuQgJoWte', '4vgDxDJubQxmHsaNxgsTESer9Qgk1bGKuE1dN4fXzsoj', '26kZ9rg8Y5pd4j1tdT4cbT8BQRu5uDbXkaVs3L5QasHy', '5TMR5GFPTzkNhprnty2ocXEFLoLur1uchg4NkEr5WEf4', 'G2VzymsKt3zNAn4CKBndYcS67w6Kny5sDEp7Y2W1aTf6', 'GfXQesPe3Zuwg8JhAt6Cg8euJDTVx751enp9EQQmhzPH', 'dVs7zZksjFuq73xbtUC62brFXYYuxCuPSG4wZeGiHck', 'DGPYpCdiVg2shab2TnNiZ2RnsjBQSmhgN71hJyWC5cYn', 'GTvBQnRvAPweU2qmYg8MDLND2PAAyYFKe35aKQGMRDaL', 'EcJWNtETrzdbj8s2dXpaE4Tu4r7fxALD6TNw9H8S6ksz', '7nt58K2KF8HwBiCcRprCYP7civVifXP59fE4x229gAuv', '2QwCvtKyVew25mvDDJDwgopMhwRmNQqqGDsjCBVACo8A', '4aDdi3EiDPMbeZ3e5BvbFMt4vfJaoahaHxZuwKQRtFc1', 'Hg5SEgwdHw8FUoKMPDZbREii3qEXwn6EvvsUdBPM2rmi', '7Dt5oUpxHWuKH8bCTXDLz2j3JyxA7jEmtzqCG6pnh96X', 'AbcX4XBm7DJ3i9p29i6sU8WLmiW4FWY5tiwB9D6UBbcE', 'HrTZPWV4ZPebBiwyzoTBajCD49kQqVwf4dwsLuYG8CXX', '4hSXPtxZgXFpo6Vxq9yqxNjcBoqWN3VoaPJWonUtupzD', 'BrNoqdHUCcv9yTncnZeSjSov8kqhpmzv1nAiPbq1M95H', '28ipXVfkdmu1PDowCHbcSfzkpH9edZmSiVoDhY5xGVfR', 'HsfSzb7BBv4oKNFamgoyBEQ4v6BHUhEREEmBnWzZJXLj', 'ECCKBDWX3MkEcf3bULbLBb9FvrEQLsmPMFTKFpvjzqgP', 'EDmbDc7sY87dKszqyZ3rHczWbKcCyUvJQSJpE3Cg4RcZ', 'DSVc1Rd69sLcXBoZAsvkzK4jGFyJKP77K3J2CaFVNAFP', 'HEibUTHW7JHt7VgKYtWpQCSX7a6YHBmRQrixZs9i4wGY', 'CAUbSmiNuj16phNiskMdwWZEAUXCfXaUSamDFyf7pAa6', '6RoLbZJWJHpTk4sdPsWzocEHiRtzPS36WcBjnMXuQrfU', 'JDd3hy3gQn2V982mi1zqhNqUw1GfV2UL6g76STojCJPN', '9UWZFoiCHeYRLmzmDJhdMrP7wgrTw7DMSpPiT2eHgJHe', 'EXEeLF1YiZxC1tFneoQPv6rcdq5VMUbKNGy3DLPEo2oH', 'D3Z7weHeLGWA7eg1qwVB66NCs8YLxiDTBVE6Eb1tTmwg', 'HVajxfNTWqLGxsfJA9DFThnvddveKfJLK8re1kNpeCVv', '3PgV4tvihn16J6uaQsKeekChCAH7MKyxGQYb7eaRJbMu', 'AT6FsbUy1jNR2AHE35WCVh3nSdDtYADtdhA2KitQXmL5', 'HdxkiXqeN6qpK2YbG51W23QSWj3Yygc1eEk2zwmKJExp', 'Fxj1j5ohVoRos1yQnvx6bsvYCt493VGpyLbMiq4eLff3', '7VBTpiiEjkwRbRGHJFUz6o5fWuhPFtAmy8JGhNqwHNnn', '6LChaYRYtEYjLEHhzo4HdEmgNwu2aia8CM8VhR9wn6n7', 'DNfuF1L62WWyW3pNakVkyGGFzVVhj4Yr52jSmdTyeBHm', '636N7frU8bUwYfyUAtvMQQsXhTFRuSWjxnEZihr5axGV', 'DpNVrtA3ERfKzX4F8Pi2CVykdJJjoNxyY5QgoytAwD26', 'f1BuDDhr9myYFm8X49sQ5RharwsoNPvXDtB18CMA1fu', 'HABhDh9zrzf8mA4SBo1yro8M6AirH2hZdLNPpuvMH6iA', '9ecRdqNCBxwReiY2pxtnLeayeBzE1CrSSmKKJJGdmsaJ', 'F72vY99ihQsYwqEDCfz7igKXA5me6vN2zqVsVUTpw6qL', 'FAwHi11KyJVh2pR2b2vNFEbbunTVkMFpdNBuD2dh9NRf', 'HmBmSYwYEgEZuBUYuDs9xofyqBAkw4ywugB1d7R7sTGh', '7iabBMwmSvS4CFPcjW2XYZY53bUCHzXjCFEFhxeYP4CY', 'AJ6MGExeK7FXmeKkKPmALjcdXVStXYokYNv9uVfDRtvo', 'CyaE1VxvBrahnPWkqm5VsdCvyS2QmNht2UFrKJHga54o', 'BCnqsPEtA1TkgednYEebRpkmwFRJDCjMQcKZMMtEdArc', '7Ny3AmDQ1cnQ29ywXdQheaoeTPvo6NGkDDtGa5bK9wRH', /* ⚠️ Same here — ends with "pump". Remove or replace. */ -- '36Qg5TussomUkM7g9FUR8KNcUNwDPbL8jdurKvw3pump', 'suqh5sHtr8HyJ7q8scBimULPkPpA557prMG47xCHQfK', '4zq1iLpmepj2Rj7W6A3XQMRQA1HyjYqVpZiBzM6aPyH7', '4gbKBZQ8aaXfmuwq2bbN6cFJEhLm1uxHv2ravrkYTgBU', 'o5e4MABwiMuhnwXWUPgwnhvGPK91i2ytnWZvAz2begi', '7SDs3PjT2mswKQ7Zo4FTucn9gJdtuW4jaacPA65BseHS', '77kq9xNZg4Vb94UJMpeHsx7pKXUBU3Wpqsm5UXhjA9dd', '831qmkeGhfL8YpcXuhrug6nHj1YdK3aXMDQUCo85Auh1' ]) AS w) ) AND quote_mint = toFixedString(base58Decode('MEFNBXixkEbait3xn9bkm8WsJzXtVsaJEn4c8Sam21u'), 32) AND method IN ('buy', 'sell') ORDER BY (slot, transaction_index, instruction_index, inner_instruction_index) DESC LIMIT 1000 OFFSET 0 SETTINGS optimize_read_in_order=0, enable_filesystem_cache=0; -- to undo /* DROP VIEW indexer.k_mv_parsed_transactions_by_quote; DROP TABLE indexer.k_parsed_transactions_by_quote SETTINGS max_table_size_to_drop = 0; */ INSERT INTO system_migrations (name) VALUES ('005_wallet_quote_mint'); -- from 006_various_changes.sql ------------------------------- /* =============================== MIGRATION 006 — Various changes =============================== */ USE indexer; SET max_memory_usage = 480000000000, max_execution_time = 0, send_timeout = 0, receive_timeout = 0, optimize_on_insert = 0, max_rows_to_read = 0, max_bytes_to_read = 0, max_threads = 56, max_bytes_before_external_group_by = '100G', group_by_two_level_threshold = 500000, group_by_two_level_threshold_bytes = '8G', max_bytes_before_external_sort = '100G', allow_nondeterministic_mutations = 1, max_table_size_to_drop = 0; DROP TABLE IF EXISTS indexer.mv_trade_volume_hourly_by_mint_mv; DROP TABLE IF EXISTS indexer.mv_trade_volume_hourly_by_mint; ALTER TABLE indexer.parsed_new_token_transactions_optimized DROP PROJECTION IF EXISTS uri_search_projection; /* ---------- Trading Volume Hourly Materialized View ---------- */ CREATE TABLE indexer.mv_trade_volume_hourly_by_mint ( `mint` FixedString(32), `hour` DateTime('UTC'), `total_volume` AggregateFunction(sum, Float64), `buy_volume` AggregateFunction(sum, Float64), `sell_volume` AggregateFunction(sum, Float64), `total_txns` AggregateFunction(count), `buy_txns` AggregateFunction(count), `sell_txns` AggregateFunction(count), PROJECTION hour_projection ( SELECT * ORDER BY hour ), PROJECTION mint_hour_projection ( SELECT * ORDER BY mint, hour ) ) ENGINE = AggregatingMergeTree PARTITION BY toYYYYMM(hour) ORDER BY (mint, hour) SETTINGS index_granularity = 8192, deduplicate_merge_projection_mode = 'rebuild'; CREATE MATERIALIZED VIEW indexer.mv_trade_volume_hourly_by_mint_mv TO indexer.mv_trade_volume_hourly_by_mint AS SELECT quote_mint AS mint, toStartOfHour(timestamp) AS hour, sumState(usd_amount) AS total_volume, sumStateIf(usd_amount, method = 'buy') AS buy_volume, sumStateIf(usd_amount, method = 'sell') AS sell_volume, countState() AS total_txns, countStateIf(method = 'buy') AS buy_txns, countStateIf(method = 'sell') AS sell_txns FROM indexer.parsed_transactions_optimized WHERE is_swap = 1 GROUP BY mint, hour; /* ---------- URI Search Projection ---------- */ ALTER TABLE indexer.parsed_new_token_transactions_optimized ADD PROJECTION uri_search_projection ( SELECT * ORDER BY uri, timestamp ); ALTER TABLE indexer.parsed_new_token_transactions_optimized MATERIALIZE PROJECTION uri_search_projection; INSERT INTO indexer.mv_trade_volume_hourly_by_mint SELECT quote_mint AS mint, toStartOfHour(timestamp) AS hour, sumState(usd_amount) AS total_volume, sumStateIf(usd_amount, method = 'buy') AS buy_volume, sumStateIf(usd_amount, method = 'sell') AS sell_volume, countState() AS total_txns, countStateIf(method = 'buy') AS buy_txns, countStateIf(method = 'sell') AS sell_txns FROM indexer.parsed_transactions_optimized WHERE is_swap = 1 GROUP BY mint, hour; /* ---------- Migration bookkeeping ---------- */ INSERT INTO system_migrations (name) VALUES ('006_various_changes'); -
kokizzu revised this gist
Oct 3, 2025 . 1 changed file with 99 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -766,3 +766,102 @@ GROUP BY if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.1, CAST(NULL AS Nullable(FixedString(32)))), if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.2, CAST(NULL AS Nullable(String))), if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.3, CAST(NULL AS Nullable(String))); /* ---------- Backfill best_pool_resolved_by_qmint (with embedded metrics) ---------- */ INSERT INTO indexer.best_pool_resolved_by_qmint WITH toFixedString('', 32) AS empty32, ( empty32, CAST('' AS LowCardinality(String)), CAST('' AS LowCardinality(String)) ) AS empty_tuple SELECT q.quote_mint, /* identity columns for the primary (resolved) pool */ primary_tuple.1 AS pool, primary_tuple.2 AS platform, primary_tuple.3 AS launchpad, CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool) AS is_launchpad, CAST(q.mig IS NOT NULL AS Bool) AS is_migration, /* historical (optional) identity */ if( q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.1, CAST(NULL AS Nullable(FixedString(32))) ) AS historical_pool, if( q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.2, CAST(NULL AS Nullable(String)) ) AS historical_platform, if( q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.3, CAST(NULL AS Nullable(String)) ) AS historical_launchpad, /* === merged metrics across chosen pools === */ countIfMergeState(p.buy_count) AS buy_count, countIfMergeState(p.sell_count) AS sell_count, sumMergeState(p.buy_base_amount) AS buy_base_amount, sumMergeState(p.sell_base_amount) AS sell_base_amount, sumMergeState(p.buy_usd_amount) AS buy_usd_amount, sumMergeState(p.sell_usd_amount) AS sell_usd_amount, argMaxMergeState(p.price_base) AS price_base, argMaxMergeState(p.price_usd) AS price_usd, argMaxMergeState(p.base_liquidity) AS base_liquidity, argMaxMergeState(p.quote_liquidity) AS quote_liquidity, argMaxMergeState(p.base_liquidity_real) AS base_liquidity_real, argMaxMergeState(p.quote_liquidity_real) AS quote_liquidity_real, minMergeState(p.first_transaction_time) AS first_transaction_time FROM ( /* Resolve the tuple set per quote_mint */ SELECT quote_mint, if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple) AS mig, if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple) AS lp, if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple) AS best, coalesce( if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple), if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple), if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple) ) AS primary_tuple FROM indexer.best_pools_by_qmint_agg GROUP BY quote_mint ) AS q /* expand to the set of chosen pools: primary + (optional) historical lp */ ARRAY JOIN arrayFilter(x -> x.1 != empty32, [ primary_tuple, if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp, empty_tuple) ]) AS chosen_tuple ANY LEFT JOIN indexer.pool_aggregates_optimized AS p ON p.pool = chosen_tuple.1 GROUP BY q.quote_mint, primary_tuple.1, primary_tuple.2, primary_tuple.3, CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool), CAST(q.mig IS NOT NULL AS Bool), if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.1, CAST(NULL AS Nullable(FixedString(32)))), if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.2, CAST(NULL AS Nullable(String))), if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.3, CAST(NULL AS Nullable(String))) SETTINGS max_memory_usage = '120G', max_bytes_in_join = '120G', join_algorithm = 'partial_merge', max_threads = 32; /* ---------- Counts for quick sanity ---------- */ SELECT 'pool_aggregates_optimized rows:', count() FROM indexer.pool_aggregates_optimized; SELECT 'best_pools_by_qmint_agg rows:', count() FROM indexer.best_pools_by_qmint_agg; SELECT 'best_pool_resolved_by_qmint rows:', count() FROM indexer.best_pool_resolved_by_qmint; /* ---------- Migration bookkeeping ---------- */ INSERT INTO system_migrations (name) VALUES ('004_good_pools_and_metrics'); -
kokizzu revised this gist
Oct 3, 2025 . 1 changed file with 116 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -650,3 +650,119 @@ GROUP BY quote_mint SETTINGS max_memory_usage = '80G', max_threads = 32; /* ============================================== CHANGE #3 — Resolve single best (and historical) + embed ALL metrics directly here ============================================== */ CREATE TABLE IF NOT EXISTS indexer.best_pool_resolved_by_qmint ( quote_mint FixedString(32), pool FixedString(32), platform LowCardinality(String), launchpad LowCardinality(String), is_launchpad Bool, is_migration Bool, historical_pool Nullable(FixedString(32)), historical_platform Nullable(String), -- previously Nullable(LowCardinality(String historical_launchpad Nullable(String), -- previously Nullable(LowCardinality(String /* === embedded metrics aggregated across the chosen pools (primary + optional historical) === */ buy_count AggregateFunction(countIf, Bool), sell_count AggregateFunction(countIf, Bool), buy_base_amount AggregateFunction(sum, UInt64), sell_base_amount AggregateFunction(sum, UInt64), buy_usd_amount AggregateFunction(sum, Float64), sell_usd_amount AggregateFunction(sum, Float64), price_base AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)), price_usd AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)), base_liquidity AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), quote_liquidity AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), base_liquidity_real AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), quote_liquidity_real AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), first_transaction_time AggregateFunction(min, DateTime('UTC')) ) ENGINE = ReplacingMergeTree ORDER BY quote_mint; CREATE MATERIALIZED VIEW IF NOT EXISTS indexer.mv_best_pool_resolved_by_qmint TO indexer.best_pool_resolved_by_qmint AS WITH toFixedString('', 32) AS empty32, ( empty32, CAST('' AS LowCardinality(String)), CAST('' AS LowCardinality(String)) ) AS empty_tuple SELECT q.quote_mint, /* identity columns for the primary (resolved) pool */ primary_tuple.1 AS pool, primary_tuple.2 AS platform, primary_tuple.3 AS launchpad, CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool) AS is_launchpad, CAST(q.mig IS NOT NULL AS Bool) AS is_migration, /* historical (optional) identity */ if( q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.1, CAST(NULL AS Nullable(FixedString(32))) ) AS historical_pool, if( q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.2, CAST(NULL AS Nullable(String)) ) AS historical_platform, if( q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.3, CAST(NULL AS Nullable(String)) ) AS historical_launchpad, /* === merged metrics across chosen pools === */ countIfMergeState(p.buy_count) AS buy_count, countIfMergeState(p.sell_count) AS sell_count, sumMergeState(p.buy_base_amount) AS buy_base_amount, sumMergeState(p.sell_base_amount) AS sell_base_amount, sumMergeState(p.buy_usd_amount) AS buy_usd_amount, sumMergeState(p.sell_usd_amount) AS sell_usd_amount, argMaxMergeState(p.price_base) AS price_base, argMaxMergeState(p.price_usd) AS price_usd, argMaxMergeState(p.base_liquidity) AS base_liquidity, argMaxMergeState(p.quote_liquidity) AS quote_liquidity, argMaxMergeState(p.base_liquidity_real) AS base_liquidity_real, argMaxMergeState(p.quote_liquidity_real) AS quote_liquidity_real, minMergeState(p.first_transaction_time) AS first_transaction_time FROM ( /* Resolve the tuple set per quote_mint */ SELECT quote_mint, if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple) AS mig, if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple) AS lp, if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple) AS best, coalesce( if(anyMerge(migration_state) != empty_tuple, anyMerge(migration_state), empty_tuple), if(anyMerge(launchpad_state) != empty_tuple, anyMerge(launchpad_state), empty_tuple), if(argMaxMerge(best_liq_state) != empty_tuple, argMaxMerge(best_liq_state), empty_tuple) ) AS primary_tuple FROM indexer.best_pools_by_qmint_agg GROUP BY quote_mint ) AS q /* expand to the set of chosen pools: primary + (optional) historical lp */ ARRAY JOIN arrayFilter(x -> x.1 != empty32, [ primary_tuple, if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp, empty_tuple) ]) AS chosen_tuple ANY LEFT JOIN indexer.pool_aggregates_optimized AS p ON p.pool = chosen_tuple.1 GROUP BY q.quote_mint, primary_tuple.1, primary_tuple.2, primary_tuple.3, CAST(q.mig IS NULL AND q.lp IS NOT NULL AS Bool), CAST(q.mig IS NOT NULL AS Bool), if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.1, CAST(NULL AS Nullable(FixedString(32)))), if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.2, CAST(NULL AS Nullable(String))), if(q.mig IS NOT NULL AND q.lp IS NOT NULL AND q.lp.1 != q.mig.1, q.lp.3, CAST(NULL AS Nullable(String))); -
kokizzu revised this gist
Oct 3, 2025 . 1 changed file with 87 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -563,3 +563,90 @@ SETTINGS group_by_two_level_threshold_bytes = '96M', max_bytes_before_external_sort = '16G'; /* ============================================== CHANGE #2 — Best pools per quote_mint (agg) ============================================== */ CREATE TABLE IF NOT EXISTS indexer.best_pools_by_qmint_agg ( quote_mint FixedString(32), -- official pools migration_state AggregateFunction(any, Tuple(FixedString(32), LowCardinality(String), LowCardinality(String))), launchpad_state AggregateFunction(any, Tuple(FixedString(32), LowCardinality(String), LowCardinality(String))), -- best non-official by latest known liquidity (UInt64) best_liq_state AggregateFunction(argMax, Tuple(FixedString(32), LowCardinality(String), LowCardinality(String)), UInt64) ) ENGINE = ReplicatedAggregatingMergeTree( '/clickhouse/tables/{shard}/indexer/best_pools_by_qmint_agg', '{replica}' ) ORDER BY quote_mint; CREATE MATERIALIZED VIEW IF NOT EXISTS indexer.mv_best_from_snapshot TO indexer.best_pools_by_qmint_agg AS SELECT quote_mint, -- unique migration pool per mint anyStateIf( (pool, platform, launchpad), is_migration = 1 ) AS migration_state, -- unique launchpad pool per mint (non-empty launchpad label) anyStateIf( (pool, platform, launchpad), launchpad <> '' ) AS launchpad_state, -- best non-official by latest liquidity snapshot argMaxStateIf( (pool, platform, launchpad), last_liq, (is_migration = 0) AND (launchpad = '') ) AS best_liq_state FROM ( /* Snapshot per pool (functionally dependent columns via any*; no nested aggs above this level) */ SELECT any(quote_mint) AS quote_mint, pool, any(platform) AS platform, any(launchpad) AS launchpad, -- latest known liquidity per pool (UInt64) argMaxMerge(quote_liquidity_real) AS last_liq, -- ever-true migration flag maxMerge(is_migration_state) AS is_migration FROM indexer.pool_aggregates_optimized GROUP BY pool ) GROUP BY quote_mint; /* ---------- Backfill best_pools_by_qmint_agg ---------- */ INSERT INTO indexer.best_pools_by_qmint_agg SELECT quote_mint, anyStateIf( (pool, platform, launchpad), is_migration = 1 ) AS migration_state, anyStateIf( (pool, platform, launchpad), launchpad <> '' ) AS launchpad_state, argMaxStateIf( (pool, platform, launchpad), last_liq, (is_migration = 0) AND (launchpad = '') ) AS best_liq_state FROM ( SELECT any(quote_mint) AS quote_mint, pool, any(platform) AS platform, any(launchpad) AS launchpad, argMaxMerge(quote_liquidity_real) AS last_liq, maxMerge(is_migration_state) AS is_migration FROM indexer.pool_aggregates_optimized GROUP BY pool ) GROUP BY quote_mint SETTINGS max_memory_usage = '80G', max_threads = 32; -
kokizzu renamed this gist
Oct 3, 2025 . 1 changed file with 208 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,3 +1,6 @@ -- from 003_candles_new_migration.sql ------------------------------------- USE indexer; SET max_memory_usage = 0, max_execution_time = 0, @@ -355,4 +358,208 @@ SELECT FROM indexer.ohlcv_agg_slot; INSERT INTO system_migrations (name) VALUES ('003_candles_new_migration'); -- from 004_pools_and_metrics__pool_aggregates_optimized2.sql ------------------------------------------------------------- /* =============================== MIGRATION 004 — Good Pools & Metrics (metrics embedded in best_pool_resolved_by_qmint) =============================== */ USE indexer; SET max_memory_usage = '150G', max_execution_time = 0, send_timeout = 0, receive_timeout = 0, optimize_on_insert = 0, max_rows_to_read = 0, max_bytes_to_read = 0, max_threads = 8, max_bytes_before_external_group_by = '3G', group_by_two_level_threshold = 100000, group_by_two_level_threshold_bytes = '128M', max_bytes_before_external_sort = '2G', allow_nondeterministic_mutations = 1, max_table_size_to_drop = 0; /* ---------- DROP old objects in dependency order (if exist) ---------- */ DROP VIEW IF EXISTS indexer.mv_best_pool_metrics_by_qmint; DROP TABLE IF EXISTS indexer.best_pool_metrics_by_qmint; DROP VIEW IF EXISTS indexer.mv_best_pool_resolved_by_qmint; DROP TABLE IF EXISTS indexer.best_pool_resolved_by_qmint; DROP VIEW IF EXISTS indexer.mv_best_from_snapshot; DROP TABLE IF EXISTS indexer.best_pools_by_qmint_agg; DROP VIEW IF EXISTS indexer.mv_pool_aggregates_optimized; DROP TABLE IF EXISTS indexer.pool_aggregates_optimized; DROP VIEW IF EXISTS indexer.mv_pool_aggregates_optimized2; DROP TABLE IF EXISTS indexer.pool_aggregates_optimized2; /* ============================================== CHANGE #1 — Pool aggregates (optimized) ============================================== */ CREATE TABLE indexer.pool_aggregates_optimized ( /* functionally dependent on pool */ `base_mint` FixedString(32), `quote_mint` FixedString(32), `pool` FixedString(32), /* labels */ `platform` LowCardinality(String), `launchpad` LowCardinality(String), /* ever-true flags (logical OR over history) */ `is_migration_state` AggregateFunction(max, Bool), /* counters & sums */ `buy_count` AggregateFunction(countIf, Bool), `sell_count` AggregateFunction(countIf, Bool), `buy_base_amount` AggregateFunction(sum, UInt64), `sell_base_amount` AggregateFunction(sum, UInt64), `buy_usd_amount` AggregateFunction(sum, Float64), `sell_usd_amount` AggregateFunction(sum, Float64), /* recency-weighted by position tuple */ `price_base` AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)), `price_usd` AggregateFunction(argMax, Float64, Tuple(UInt64, UInt32, Int8, Int8)), `base_liquidity` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), `quote_liquidity` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), `base_liquidity_real` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), `quote_liquidity_real` AggregateFunction(argMax, UInt64, Tuple(UInt64, UInt32, Int8, Int8)), /* earliest position */ `first_transaction_time` AggregateFunction(min, DateTime('UTC')), /* order-only projection */ PROJECTION quote_mint_projection ( SELECT * ORDER BY quote_mint, pool ) ) ENGINE = ReplicatedAggregatingMergeTree( '/clickhouse/tables/{shard}/indexer/pool_aggregates_optimized_new', '{replica}' ) ORDER BY pool SETTINGS index_granularity = 2048, deduplicate_merge_projection_mode = 'drop'; CREATE MATERIALIZED VIEW indexer.mv_pool_aggregates_optimized TO indexer.pool_aggregates_optimized AS SELECT /* states / labels via any() because we GROUP BY pool */ any(base_mint) AS base_mint, any(quote_mint) AS quote_mint, pool, any(platform) AS platform, any(launchpad) AS launchpad, /* ever-true flags */ maxState(is_migration) AS is_migration_state, /* measures */ countIfState(method = 'buy') AS buy_count, countIfState(method = 'sell') AS sell_count, sumIfState(base_amount, method = 'buy') AS buy_base_amount, sumIfState(base_amount, method = 'sell') AS sell_base_amount, sumIfState(usd_amount, method = 'buy') AS buy_usd_amount, sumIfState(usd_amount, method = 'sell') AS sell_usd_amount, /* recency by position tuple (slot, tx_idx, ix_idx, inner_ix_idx) Apply the ≥100 base/quote thresholds and require swaps for prices. */ argMaxStateIf( price_base, (slot, transaction_index, instruction_index, inner_instruction_index), (is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) ) AS price_base, argMaxStateIf( price_usd, (slot, transaction_index, instruction_index, inner_instruction_index), (is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) ) AS price_usd, /* liquidity snapshots by recency (no additional filter) */ argMaxState( base_liquidity, (slot, transaction_index, instruction_index, inner_instruction_index) ) AS base_liquidity, argMaxState( quote_liquidity, (slot, transaction_index, instruction_index, inner_instruction_index) ) AS quote_liquidity, argMaxState( base_liquidity_real, (slot, transaction_index, instruction_index, inner_instruction_index) ) AS base_liquidity_real, argMaxState( quote_liquidity_real, (slot, transaction_index, instruction_index, inner_instruction_index) ) AS quote_liquidity_real, /* earliest observed ts */ minState(timestamp) AS first_transaction_time FROM indexer.parsed_transactions_optimized GROUP BY pool; /* ---------- Backfill pool_aggregates_optimized ---------- */ INSERT INTO indexer.pool_aggregates_optimized SELECT any(base_mint) AS base_mint, any(quote_mint) AS quote_mint, pool, any(platform) AS platform, any(launchpad) AS launchpad, maxState(is_migration) AS is_migration_state, countIfState(method = 'buy') AS buy_count, countIfState(method = 'sell') AS sell_count, sumIfState(base_amount, method = 'buy') AS buy_base_amount, sumIfState(base_amount, method = 'sell') AS sell_base_amount, sumIfState(usd_amount, method = 'buy') AS buy_usd_amount, sumIfState(usd_amount, method = 'sell') AS sell_usd_amount, argMaxStateIf( price_base, (slot, transaction_index, instruction_index, inner_instruction_index), (is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) ) AS price_base, argMaxStateIf( price_usd, (slot, transaction_index, instruction_index, inner_instruction_index), (is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) ) AS price_usd, argMaxState( base_liquidity, (slot, transaction_index, instruction_index, inner_instruction_index) ) AS base_liquidity, argMaxState( quote_liquidity, (slot, transaction_index, instruction_index, inner_instruction_index) ) AS quote_liquidity, argMaxState( base_liquidity_real, (slot, transaction_index, instruction_index, inner_instruction_index) ) AS base_liquidity_real, argMaxState( quote_liquidity_real, (slot, transaction_index, instruction_index, inner_instruction_index) ) AS quote_liquidity_real, minState(timestamp) AS first_transaction_time FROM indexer.parsed_transactions_optimized GROUP BY pool SETTINGS max_memory_usage = '120G', max_threads = 16, max_bytes_before_external_group_by = '16G', group_by_two_level_threshold = 80000, group_by_two_level_threshold_bytes = '96M', max_bytes_before_external_sort = '16G'; -
kokizzu created this gist
Oct 3, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,358 @@ USE indexer; SET max_memory_usage = 0, max_execution_time = 0, send_timeout = 0, receive_timeout = 0, optimize_on_insert = 0, max_rows_to_read = 0, max_bytes_to_read = 0, max_threads = 8, max_bytes_before_external_group_by = '3G', group_by_two_level_threshold = 100000, group_by_two_level_threshold_bytes = '128M', max_bytes_before_external_sort = '2G', allow_nondeterministic_mutations = 1, max_table_size_to_drop = 0; DROP TABLE IF EXISTS indexer.mv_ohlcv_agg_slot; DROP TABLE IF EXISTS indexer.ohlcv_agg_slot; DROP TABLE IF EXISTS indexer.mv_mev_sandwich_slots; DROP TABLE IF EXISTS indexer.mev_sandwich_slots; CREATE TABLE indexer.ohlcv_agg_slot ( quote_mint FixedString(32), pool FixedString(32), is_launchpad UInt8, bucket UInt64, bucket_ts DateTime, open_state_usd AggregateFunction(argMin, Float64, Tuple( UInt32, Int8, Int8)), high_state_usd AggregateFunction(max, Float64), low_state_usd AggregateFunction(min, Float64), close_state_usd AggregateFunction(argMax, Float64, Tuple( UInt32, Int8, Int8)), vol_state_usd AggregateFunction(sumIf, Float64, UInt8), open_state_base AggregateFunction(argMin, Float64, Tuple( UInt32, Int8, Int8)), high_state_base AggregateFunction(max, Float64), low_state_base AggregateFunction(min, Float64), close_state_base AggregateFunction(argMax, Float64, Tuple( UInt32, Int8, Int8)), vol_state_base AggregateFunction(sumIf, UInt64, UInt8) ) ENGINE = AggregatingMergeTree ORDER BY (quote_mint, bucket, pool, is_launchpad); CREATE MATERIALIZED VIEW indexer.mv_ohlcv_agg_slot TO indexer.ohlcv_agg_slot ( quote_mint FixedString(32), pool FixedString(32), is_launchpad UInt8, bucket UInt64, bucket_ts DateTime('UTC'), open_state_usd AggregateFunction(argMin, Float64, Tuple( UInt32, Int8, Int8)), high_state_usd AggregateFunction(max, Float64), low_state_usd AggregateFunction(min, Float64), close_state_usd AggregateFunction(argMax, Float64, Tuple( UInt32, Int8, Int8)), vol_state_usd AggregateFunction(sumIf, Float64, UInt8), open_state_base AggregateFunction(argMin, Float64, Tuple( UInt32, Int8, Int8)), high_state_base AggregateFunction(max, Float64), low_state_base AggregateFunction(min, Float64), close_state_base AggregateFunction(argMax, Float64, Tuple( UInt32, Int8, Int8)), vol_state_base AggregateFunction(sumIf, UInt64, UInt8) ) AS SELECT quote_mint, pool, toUInt8(is_launchpad) AS is_launchpad, slot AS bucket, timestamp AS bucket_ts, argMinState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_usd, maxState(price_usd) AS high_state_usd, minState(price_usd) AS low_state_usd, argMaxState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_usd, sumIfState(usd_amount, is_swap = 1) AS vol_state_usd, argMinState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_base, maxState(price_base) AS high_state_base, minState(price_base) AS low_state_base, argMaxState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_base, sumIfState(base_amount, is_swap = 1) AS vol_state_base FROM indexer.parsed_transactions_optimized WHERE ((is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd > 0) AND (price_base > 0)) OR ((is_launchpad = 1) AND (is_swap = 0)) GROUP BY quote_mint, pool, is_launchpad, bucket, bucket_ts; CREATE TABLE indexer.mev_sandwich_slots ( quote_mint FixedString(32), slot UInt64 ) ENGINE = MergeTree ORDER BY (quote_mint, slot); CREATE MATERIALIZED VIEW indexer.mv_mev_sandwich_slots TO indexer.mev_sandwich_slots ( quote_mint FixedString(32), slot UInt64 ) AS SELECT quote_mint, slot FROM ( SELECT quote_mint, slot, countIf((method = 'buy') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS buys, countIf((method = 'sell') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS sells, minIf(price_base, method = 'buy') AS min_buy, maxIf(price_base, method = 'sell') AS max_sell FROM indexer.parsed_transactions_optimized GROUP BY quote_mint, slot, wallet HAVING (buys > 0) AND (sells > 0) AND (min_buy < max_sell) ) GROUP BY quote_mint, slot; INSERT INTO indexer.ohlcv_agg_slot SELECT quote_mint, pool, toUInt8(is_launchpad) AS is_launchpad, slot AS bucket, timestamp AS bucket_ts, argMinState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_usd, maxState(price_usd) AS high_state_usd, minState(price_usd) AS low_state_usd, argMaxState(price_usd, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_usd, sumIfState(usd_amount, is_swap = 1) AS vol_state_usd, argMinState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS open_state_base, maxState(price_base) AS high_state_base, minState(price_base) AS low_state_base, argMaxState(price_base, (transaction_index, instruction_index, inner_instruction_index)) AS close_state_base, sumIfState(base_amount, is_swap = 1) AS vol_state_base FROM indexer.parsed_transactions_optimized WHERE ((is_swap = 1) AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd > 0) AND (price_base > 0)) OR ((is_launchpad = 1) AND (is_swap = 0)) GROUP BY quote_mint, pool, is_launchpad, bucket, bucket_ts SETTINGS max_memory_usage = '120G', max_threads = '60', max_bytes_before_external_group_by = '16G', group_by_two_level_threshold = 500000, group_by_two_level_threshold_bytes = '640M', max_bytes_before_external_sort = '16G'; -- Break down the complex query into steps to reduce memory usage CREATE TEMPORARY TABLE temp_wallet_sandwich_candidates AS SELECT quote_mint, slot, wallet, countIf((method = 'buy') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS buys, countIf((method = 'sell') AND (base_amount >= 100) AND (quote_amount >= 100) AND (price_usd != 0) AND (price_base != 0)) AS sells, minIf(price_base, method = 'buy') AS min_buy, maxIf(price_base, method = 'sell') AS max_sell FROM indexer.parsed_transactions_optimized GROUP BY quote_mint, slot, wallet HAVING (buys > 0) AND (sells > 0) AND (min_buy < max_sell) SETTINGS max_memory_usage = 0, max_threads = 6, max_bytes_before_external_group_by = '16G', group_by_two_level_threshold = 50000, group_by_two_level_threshold_bytes = '64M', max_bytes_before_external_sort = '16G', join_algorithm = 'hash', max_rows_in_join = 10000000; INSERT INTO indexer.mev_sandwich_slots SELECT quote_mint, slot FROM temp_wallet_sandwich_candidates GROUP BY quote_mint, slot SETTINGS max_memory_usage = 0, max_threads = 6, max_bytes_before_external_group_by = '16G', group_by_two_level_threshold = 250000, group_by_two_level_threshold_bytes = '320M', max_bytes_before_external_sort = '16G'; DROP TABLE temp_wallet_sandwich_candidates; SELECT 'OHLCV rows inserted:', count() FROM indexer.ohlcv_agg_slot; SELECT 'MEV rows inserted:', count() FROM indexer.mev_sandwich_slots; DROP TABLE IF EXISTS indexer.mv_trade_agg_qm_wallet; ALTER TABLE indexer.trade_agg_qm_wallet ADD COLUMN buy_usd_amount AggregateFunction(sum, Float64), ADD COLUMN sell_usd_amount AggregateFunction(sum, Float64); CREATE MATERIALIZED VIEW indexer.mv_trade_agg_qm_wallet TO indexer.trade_agg_qm_wallet ( quote_mint FixedString(32), wallet FixedString(32), buy_count AggregateFunction(count), sell_count AggregateFunction(count), buy_base_amount AggregateFunction(sum, UInt64), sell_base_amount AggregateFunction(sum, UInt64), buy_quote_amount AggregateFunction(sum, UInt64), sell_quote_amount AggregateFunction(sum, UInt64), launchpad AggregateFunction(any, String), buy_usd_amount AggregateFunction(sum, Float64), sell_usd_amount AggregateFunction(sum, Float64) ) AS SELECT quote_mint, wallet, countMergeState(buy_count) AS buy_count, countMergeState(sell_count) AS sell_count, sumMergeState(buy_base_amount) AS buy_base_amount, sumMergeState(sell_base_amount) AS sell_base_amount, sumMergeState(buy_quote_amount) AS buy_quote_amount, sumMergeState(sell_quote_amount) AS sell_quote_amount, anyMergeState(launchpad) AS launchpad, sumMergeState(buy_usd_amount) AS buy_usd_amount, sumMergeState(sell_usd_amount) AS sell_usd_amount FROM indexer.trade_aggregates_optimized GROUP BY quote_mint, wallet; TRUNCATE TABLE indexer.trade_agg_qm_wallet SETTINGS max_table_size_to_drop = 0 ; INSERT INTO indexer.trade_agg_qm_wallet SELECT quote_mint, wallet, countMergeState(buy_count) AS buy_count, countMergeState(sell_count) AS sell_count, sumMergeState(buy_base_amount) AS buy_base_amount, sumMergeState(sell_base_amount) AS sell_base_amount, sumMergeState(buy_quote_amount) AS buy_quote_amount, sumMergeState(sell_quote_amount) AS sell_quote_amount, anyMergeState(launchpad) AS launchpad, sumMergeState(buy_usd_amount) AS buy_usd_amount, sumMergeState(sell_usd_amount) AS sell_usd_amount FROM indexer.trade_aggregates_optimized GROUP BY quote_mint, wallet SETTINGS max_memory_usage = '120G', max_threads = 60, max_bytes_before_external_group_by = '16G', group_by_two_level_threshold = 50000, group_by_two_level_threshold_bytes = '64M', max_bytes_before_external_sort = '16G'; SELECT 'Trade aggregation rows updated:', count() FROM indexer.trade_agg_qm_wallet; CREATE TABLE indexer.ohlcv_finalized ( quote_mint FixedString(32), pool FixedString(32), is_launchpad UInt8, bucket UInt64, bucket_ts DateTime, open_usd Float64, high_usd Float64, low_usd Float64, close_usd Float64, vol_usd Float64, open_base Float64, high_base Float64, low_base Float64, close_base Float64, vol_base UInt64 ) ENGINE = MergeTree ORDER BY (quote_mint, bucket, pool, is_launchpad); CREATE MATERIALIZED VIEW indexer.ohlcv_mv TO indexer.ohlcv_finalized AS SELECT quote_mint, pool, is_launchpad, bucket, bucket_ts, finalizeAggregation(open_state_usd) AS open_usd, finalizeAggregation(high_state_usd) AS high_usd, finalizeAggregation(low_state_usd) AS low_usd, finalizeAggregation(close_state_usd) AS close_usd, finalizeAggregation(vol_state_usd) AS vol_usd, finalizeAggregation(open_state_base) AS open_base, finalizeAggregation(high_state_base) AS high_base, finalizeAggregation(low_state_base) AS low_base, finalizeAggregation(close_state_base) AS close_base, finalizeAggregation(vol_state_base) AS vol_base FROM indexer.ohlcv_agg_slot; INSERT INTO indexer.ohlcv_finalized SELECT quote_mint, pool, is_launchpad, bucket, bucket_ts, finalizeAggregation(open_state_usd) AS open_usd, finalizeAggregation(high_state_usd) AS high_usd, finalizeAggregation(low_state_usd) AS low_usd, finalizeAggregation(close_state_usd) AS close_usd, finalizeAggregation(vol_state_usd) AS vol_usd, finalizeAggregation(open_state_base) AS open_base, finalizeAggregation(high_state_base) AS high_base, finalizeAggregation(low_state_base) AS low_base, finalizeAggregation(close_state_base) AS close_base, finalizeAggregation(vol_state_base) AS vol_base FROM indexer.ohlcv_agg_slot; INSERT INTO system_migrations (name) VALUES ('003_candles_new_migration');