require 'bundler/inline' gemfile do source 'https://rubygems.org' gem 'timescaledb' gem 'bulk_insert' gem 'pry' end require 'timescaledb' class Download < ActiveRecord::Base acts_as_hypertable time_column: 'ts' scope :time_bucket, -> (range='1m', query="count(*)") do select("time_bucket('#{range}', #{time_column}) as #{time_column}, #{query}") end scope :per_minute, -> (query="count(*) as downloads") do time_bucket('1m', query).group(1) end scope :stats_agg_per_minute, -> do per_minute("gem_name, gem_version, stats_agg(cast(1.0 as double precision)) as stats_agg") .group(1,2,3) end scope :gems_per_minute, -> do per_minute("gem_name, count(*) as downloads").group(1,2) end scope :versions_per_minute, -> do per_minute("gem_name, gem_version, count(*) as downloads").group(1,2,3) end cagg = -> (view_name) do Class.new(ActiveRecord::Base) do self.table_name = "downloads_#{view_name}" QUERIES = { sum_downloads: "sum(downloads)::bigint as downloads", avg_downloads: "avg(downloads)::bigint as avg_downloads", rollup: "rollup(stats_agg) as stats_agg", rolling_downloads: "rolling(stats_agg)->sum() as downloads", rollup_downloads: "rollup(stats_agg)->sum() as downloads", rollup_downloads_per_gem: "gem_name,rollup(stats_agg)->sum() as downloads", rollup_downloads_per_version: "gem_name, gem_version, rollup(stats_agg)->sum() as downloads" } scope :rollup, -> (range='1d', query=:sum_downloads) do select("time_bucket('#{range}', ts) as ts, #{QUERIES[query] || query}") .group(1) end scope :per_hour, -> (query=:sum_downloads) do rollup('1h', query) end scope :per_day, -> (query=:sum_downloads) do rollup('1d', query) end scope :per_week, -> (query=:sum_downloads) do rollup('1w', query) end scope :per_month, -> (query=:sum_downloads) do rollup('1mon', query) end scope :per_year, -> (query=:sum_downloads) do rollup('1y', query) end def readonly? true end def self.refresh! ActiveRecord::Base.connection.execute <<-SQL CALL refresh_continuous_aggregate('#{table_name}', null, null); SQL end end end PerMinute = cagg['per_minute'] PerHour= cagg['per_hour'] PerDay = cagg['per_day'] PerMonth = cagg['per_month'] GemsPerMinute = cagg['gems_per_minute'] GemsPerHour= cagg['gems_per_hour'] GemsPerDay = cagg['gems_per_day'] GemsPerMonth= cagg['gems_per_month'] VersionsPerMinute= cagg['versions_per_minute'] VersionsPerHour = cagg['versions_per_hour'] VersionsPerDay = cagg['versions_per_day'] VersionsPerMonth = cagg['versions_per_month'] StatsAggPerMinute = cagg['stats_agg_per_minute'] StatsAggPerHour = cagg['stats_agg_per_hour'] StatsAggPerDay = cagg['stats_agg_per_day'] StatsAggPerMonth = cagg['stats_agg_per_month'] end # Connect to the database ActiveRecord::Base.establish_connection(ENV['DATABASE_URL']) ActiveRecord::Base.connection.instance_exec do ActiveRecord::Base.logger = Logger.new(STDOUT) %w[day hour minute].each do |frame| ["downloads_per_#{frame}", "downloads_gems_per_#{frame}", "downloads_versions_per_#{frame}", "downloads_stats_agg_per_#{frame}" ].each do |view| execute("DROP MATERIALIZED VIEW IF EXISTS #{view} cascade") end end drop_table(:downloads, force: :cascade) if Download.table_exists? hypertable_options = { time_column: 'ts', chunk_time_interval: '1 day', compress_segmentby: 'gem_name, gem_version', compress_orderby: 'ts DESC', compression_interval: '7 days' } create_table(:downloads, id: false, hypertable: hypertable_options) do |t| t.timestamptz :ts, null: false t.text :gem_name, :gem_version, null: false t.jsonb :payload end { per_minute: Download.per_minute, per_hour: Download::PerMinute.per_hour(:sum_downloads).group(1), per_day: Download::PerHour.per_day(:sum_downloads).group(1), per_month: Download::PerDay.per_month(:sum_downloads).group(1), gems_per_minute: Download.gems_per_minute, gems_per_hour: Download::GemsPerMinute.per_hour("gem_name, count(*) as downloads").group(1,2), gems_per_day: Download::GemsPerHour.per_day("gem_name, count(*) as downloads").group(1,2), gems_per_month: Download::GemsPerDay.per_month("gem_name, count(*) as downloads").group(1,2), versions_per_minute: Download.versions_per_minute, versions_per_hour: Download::VersionsPerMinute.per_hour("gem_name, gem_version, count(*) as downloads").group(1,2,3), versions_per_day: Download::VersionsPerHour.per_day("gem_name, gem_version, count(*) as downloads").group(1,2,3), versions_per_month: Download::VersionsPerDay.per_month("gem_name, gem_version, count(*) as downloads").group(1,2,3), stats_agg_per_minute: Download.stats_agg_per_minute, stats_agg_per_hour: Download::StatsAggPerMinute.per_hour(:rollup).group(1), stats_agg_per_day: Download::StatsAggPerHour.per_day(:rollup).group(1), stats_agg_per_month: Download::StatsAggPerDay.per_month(:rollup).group(1) }.each do |name, scope| puts "Creating continuous aggregate #{name}", scope.to_sql frame = name.to_s.split('per_').last create_continuous_aggregate( "downloads_#{name}", scope.to_sql, refresh_policies: { schedule_interval: "INTERVAL '1 #{frame}'", start_offset: "INTERVAL '3 #{frame}'", end_offset: "INTERVAL '1 minute'" }) end end ActiveRecord::Base.logger = nil PATH_PATTERN = /\/gems\/(?.*)-(?\d+.*)\.gem/ def parse_file(file) downloads = [] File.open(file).each_line do |log_line| fragments = log_line.split path, response_code = fragments[10, 2] case response_code.to_i # Only count successful downloads # NB: we consider a 304 response a download attempt when 200, 304 m = path.match(PATH_PATTERN) gem_name = m[:gem_name] || path gem_version = m[:gem_version] ip = fragments[3] ts = Time.parse fragments[4..9].join(' ') env = parse_env fragments[12..-1] payload = {ip:, env:} downloads << {ts:, gem_name:, gem_version:, payload:} if downloads.size == 5000 insert_downloads(downloads) downloads.clear end end end if downloads.any? insert_downloads(downloads) end end # example env = "bundler/2.5.9 rubygems/3.3.25 ruby/3.1.0" # output = {bundler: "2.5.9", rubygems: "3.3.25", ruby: "3.1.0"} # case it says single word like jruby it appends true as the value # example env = "jruby" # output = {jruby: "true"} def parse_env(output) env = output.join(' ').gsub(/command.*|\(.*\)|Ruby, /,'').strip env = nil if env == "(null)" env = env.split(' ').map do |info| pair = info.split(/\/|-/,2) pair << "true" if pair.size == 1 pair end.to_h end def insert_downloads(downloads) Download.bulk_insert values: downloads end s3_files = [ "2024-04-26T00_15_00.000-szTpTn9sP1-116Ajwl4N.log" ] Benchmark.bm do |x| s3_files.each do |file| x.report "parse and load #{file}" do parse_file(file) end %w[ PerMinute GemsPerMinute VersionsPerMinute StatsAggPerMinute PerHour GemsPerHour VersionsPerHour StatsAggPerHour PerDay GemsPerDay VersionsPerDay StatsAggPerDay PerMonth GemsPerMonth VersionsPerMonth StatsAggPerMonth ].each do |view| x.report "Refresh #{view}" do Download.const_get(view).refresh! end end end end require "pry";binding.pry =begin Download::PerHour.first Download::GemsPerHour.all Download::VersionsPerHour.where(gem_name: "rails").pluck(:gem_version, :downloads) # => [["1.2.3.4", 6], ["6.1.7", 1], ["7.0.2", 1]] =end