diff --git a/Gemfile b/Gemfile index e3a71e1..34ba75c 100644 --- a/Gemfile +++ b/Gemfile @@ -107,3 +107,4 @@ gem "pundit" gem "rack-canonical-host" gem "ruby-readability" gem "rinku", require: "rails_rinku" +gem "virtus" diff --git a/app/controllers/articles_controller.rb b/app/controllers/articles_controller.rb index 92e0108..f5b7d45 100644 --- a/app/controllers/articles_controller.rb +++ b/app/controllers/articles_controller.rb @@ -9,7 +9,17 @@ def show def click @article = Article.find(params[:id]) - # TODO: Store the click in the database + + ErrorReporter.rescue_and_report do + # TODO: Batch these in memory at some point and insert in batches. + Click.create({ + article_id: @article.id, + user_id: current_user&.id, + ip: request.remote_ip, + created_at: Time.zone.now, + }) + end + redirect_to @article.url, allow_other_host: true end end diff --git a/app/jobs/click_hour_aggregate_job.rb b/app/jobs/click_hour_aggregate_job.rb new file mode 100644 index 0000000..5a92aa8 --- /dev/null +++ b/app/jobs/click_hour_aggregate_job.rb @@ -0,0 +1,11 @@ +class ClickHourAggregateJob < ApplicationJob + queue_as :aggregates + + def perform + now = Time.now.utc + # TODO: Store last successful aggregation and aggregate from that point + # instead of past few hours. Then use the last aggregation time to determine + # whether to use live or aggregate data in the click count for articles. + ClickHourAggregate.rollup(from: now - 2.hours, to: now) + end +end diff --git a/app/jobs/clicks_maintenance_job.rb b/app/jobs/clicks_maintenance_job.rb new file mode 100644 index 0000000..379b476 --- /dev/null +++ b/app/jobs/clicks_maintenance_job.rb @@ -0,0 +1,13 @@ +class ClicksMaintenanceJob < ApplicationJob + queue_as :maintenance + + def perform + ErrorReporter.rescue_and_report do + Click.partition.premake(3) + end + + ErrorReporter.rescue_and_report do + Click.partition.retain(14) + end + end +end diff --git a/app/models/article.rb b/app/models/article.rb index 1d91d06..d3c3b81 100644 --- a/app/models/article.rb +++ b/app/models/article.rb @@ -1,4 +1,7 @@ class Article < ApplicationRecord + # The time at which we started tracking and aggregating clicks. + AGGREGATION_START_TIME = Time.utc(2022, 9, 26, 11).freeze + belongs_to :site scope :by_published_at, -> { order(published_at: :desc) } @@ -6,4 +9,51 @@ class Article < ApplicationRecord validates :title, presence: true validates :url, presence: true, uniqueness: true, format: { with: URI::regexp(%w{http https}) } validates :published_at, presence: true + + # The aggregated data for this article (click count and latest timestamp). + def aggregated_data + return @aggregated_data if defined?(@aggregated_data) + + current_hour = Time.now.utc.beginning_of_hour + sql = SQL.new <<~SQL.squish, article_id: id, current_hour: current_hour + SELECT + sum(count) as count, + max(ts) as max_ts + FROM #{ClickHourAggregate.table_name} + WHERE + article_id = :article_id AND + ts < :current_hour + SQL + + row = sql.hash_results.first || {} + max_ts = row["max_ts"] + + @aggregated_data = { + count: row["count"] || 0, + max_ts: max_ts, + } + end + + # The count of clicks that have been aggregated. + def aggregated_count + aggregated_data.fetch(:count) + end + + # The maximum timestamp of any aggregated data. + def aggregated_max_ts + aggregated_data.fetch(:max_ts) + end + + # Sums up counts that have not been aggregated yet. + def live_count + return @live_count if defined?(@live_count) + + start = (aggregated_max_ts ? aggregated_max_ts + 1.hour : AGGREGATION_START_TIME) + @live_count = Click.count(article_id: id, created_at: (start..Time.now.utc)) + end + + # Sums up aggregated counts and live counts that have not been aggregated yet. + def click_count + aggregated_count + live_count + end end diff --git a/app/models/click.rb b/app/models/click.rb new file mode 100644 index 0000000..bbf01e1 --- /dev/null +++ b/app/models/click.rb @@ -0,0 +1,182 @@ +class Click + class Row + include Virtus.value_object + + attribute :user_id, Integer + attribute :article_id, Integer + attribute :ip, String + attribute :created_at, DateTime + + delegate :site, to: :article + + def article + return if article_id.blank? + return @article if defined?(@article) + + @article = Article.find_by_id(article_id) + end + + def user + return if user_id.blank? + return @user if defined?(@user) + + @user = User.find_by_id(user_id) + end + end + + # Name of the parent table that the partitions inherit from. + def self.table_name + "clicks".freeze + end + + # Build a partition instance for a given time. + def self.partition(time = Time.now.utc) + PartitionByDay.new(table_name, time) + end + + # Public: Create one or more adapter request logs from an Array of Hashes. + # + # Note: This method assumes the partition is already created. + # Use partition(time).create if it does not exist. + # In production, jobs should automatically create new partitions. + # + # Returns Result. + def self.create_many(*rows) + Result.new { + insert_rows = rows.flatten.map do |row| + [ + row[:user_id].presence || SQL::NULL, + row[:article_id], + row[:ip], + row.fetch(:created_at).utc, + ] + end + + SQL.run <<~SQL.squish, rows: SQL::ROWS(insert_rows) + INSERT INTO #{table_name} (user_id, article_id, ip, created_at) + VALUES :rows + SQL + + nil + } + end + + # Public: Create an adapter request log from a Hash. + # + # Note: This method assumes the partition is already created. + # Use partition(time).create if it does not exist. + # In production, jobs should automatically create new partitions. + # + # Returns Result. + def self.create(attributes = {}) + create_many([attributes]) + end + + # TODO: See how to use pagy for this pagination instead of custom. + # + # Public: Paginate adapter request logs. + # + # page - The Integer page (default: 1). + # per_page - The Integer per_page (default: 20). + # site_id - The Integer site_id to filter returned logs for. + # user_id - The Integer user_id to filter returned logs for. + # + # Returns PaginateResponse. + def self.paginate(page: 1, per_page: 20, site_id: nil, user_id: nil, article_id: nil, created_at: nil) + page ||= 1 + per_page ||= 20 + page = page.to_i + per_page = per_page.to_i + + raise ArgumentError, "page must be >= 1 (was #{page})" unless page >= 1 + raise ArgumentError, "per_page must be >= 1 (was #{per_page})" unless per_page >= 1 + + limit = per_page + 1 + offset = (page - 1) * per_page + + sql = build_sql( + select: "#{table_name}.*", + site_id: site_id, + user_id: user_id, + article_id: article_id, + created_at: created_at + ) + sql.add "ORDER BY created_at DESC" + sql.add "LIMIT :limit OFFSET :offset", limit: limit, offset: offset + + PaginateResponse.new({ + page: page, + per_page: per_page, + has_next_page: sql.hash_results.slice!(per_page, 1).present?, + rows: sql.hash_results.map { |row| Row.new(row) }, + }) + end + + # Private: Count the number of adapter request logs. Only use this in tests. + # + # Returns Integer number of logs. + def self.count(article_id: nil, site_id: nil, user_id: nil, created_at: nil) + build_sql( + select: "COUNT(*)", + article_id: article_id, + site_id: site_id, + user_id: user_id, + created_at: created_at + ).value + end + + # Private: Return the last adapter request log row. Only use this in tests. + # + # Returns a Row if found else nil. + def self.last + rows = SQL.hash_results <<~SQL.squish + SELECT * FROM #{table_name} ORDER BY created_at DESC LIMIT 1 + SQL + + if rows.size == 1 + Row.new(rows[0]) + else + nil + end + end + + # Private: Build a SQL query for clicks that can filter based on article, + # site and user. + def self.build_sql(select: "*", article_id: nil, site_id: nil, user_id: nil, created_at: nil) + sql = SQL.new("SELECT #{select} FROM #{table_name}") + sql.add "INNER JOIN articles a ON a.id = #{table_name}.article_id INNER JOIN sites s ON s.id = a.site_id" if site_id.present? + sql.add "INNER JOIN users u ON u.id = #{table_name}.user_id" if user_id.present? + + fragments = [] + binds = {} + + if user_id.present? + fragments << "u.id = :user_id" + binds[:user_id] = user_id + end + + if site_id.present? + fragments << "s.id = :site_id" + binds[:site_id] = site_id + end + + if article_id.present? + fragments << "#{table_name}.article_id = :article_id" + binds[:article_id] = article_id + end + + if created_at.present? + fragments << "#{table_name}.created_at >= :from AND #{table_name}.created_at <= :to" + binds[:from] = created_at.first + binds[:to] = created_at.last + end + + if fragments.any? + sql.add "WHERE" + sql.add fragments.join(" AND "), binds + end + + sql + end + class << self; private :build_sql; end +end diff --git a/app/models/click_hour_aggregate.rb b/app/models/click_hour_aggregate.rb new file mode 100644 index 0000000..f1509a6 --- /dev/null +++ b/app/models/click_hour_aggregate.rb @@ -0,0 +1,32 @@ +class ClickHourAggregate < ApplicationRecord + def self.rollup(from:, to:) + raise ArgumentError, "from is required" if from.blank? + raise ArgumentError, "to is required" if to.blank? + raise ArgumentError, "from must be less than to" unless from < to + + binds = { + from: from, + to: to, + insert_table: SQL::LITERAL(table_name), + select_table: SQL::LITERAL(Click.table_name), + } + + SQL.run <<~SQL.squish, binds + INSERT INTO :insert_table (article_id, ts, count) + SELECT + article_id, + date_trunc('hour', created_at) AS ts, + count(*) as count + FROM :select_table + WHERE + article_id IS NOT NULL AND + created_at BETWEEN :from AND :to + GROUP BY 1, 2 + ON CONFLICT (article_id, ts) + DO UPDATE SET + count = EXCLUDED.count + SQL + end + + belongs_to :article +end diff --git a/app/models/paginate_response.rb b/app/models/paginate_response.rb new file mode 100644 index 0000000..63ecd6f --- /dev/null +++ b/app/models/paginate_response.rb @@ -0,0 +1,54 @@ +class PaginateResponse + include Enumerable + include Virtus.value_object + + attribute :page, Integer + attribute :per_page, Integer + attribute :has_next_page, Boolean + attribute :rows, Array + + def next_page + has_next_page? ? page + 1 : nil + end + + def next_page? + has_next_page? + end + + def prev_page + prev_page? ? page - 1 : nil + end + + def prev_page? + page > 1 + end + + def length + rows.length + end + alias size length + + def empty? + rows.empty? + end + + def to_ary + rows + end + + def [](idx) + rows[idx] + end + + def last(n = nil) + n ? rows.last(n) : rows.last + end + + def each(&block) + if block_given? + rows.each(&block) + else + rows.to_enum { @rows.size } + end + end +end diff --git a/app/models/partition_by_day.rb b/app/models/partition_by_day.rb new file mode 100644 index 0000000..27d6c31 --- /dev/null +++ b/app/models/partition_by_day.rb @@ -0,0 +1,254 @@ +# https://www.postgresql.org/docs/13/ddl-partitioning.html#DDL-PARTITIONING-DECLARATIVE +class PartitionByDay + class Row + include Virtus.model + + attribute :name, String + attribute :expression, String + end + + def self.validate_table(table:) + raise ArgumentError, "table cannot be blank" if table.blank? + raise ArgumentError, "table must be a String" unless table.is_a?(String) + + table + end + + def self.validate_name(table:, name:) + validate_table(table: table) + + raise ArgumentError, "name must be a String" unless name.is_a?(String) + unless name.starts_with?(table) + raise ArgumentError, "name (#{name}) must start with table (#{table})" + end + unless name =~ /_\d{4}_\d{2}_\d{2}$/ + raise ArgumentError, "name must end with yyyy_mm_dd but does not (#{name})" + end + + name + end + + def self.validate_from(from:) + raise ArgumentError, "from must not be nil" if from.nil? + + from + end + + def self.validate_to(to:) + raise ArgumentError, "to must not be nil" if to.nil? + + to + end + + def self.validate_number(number:) + raise ArgumentError, "number must not be nil" if number.nil? + unless number >= 2 + raise ArgumentError, "number should be at least 2 or whats the point" + end + + number + end + + # Fetch all partitions for a given table. + def self.all(table:) + validate_table(table: table) + + rows = SQL.hash_results <<-SQL.squish, table: table + SELECT pg_class.relname AS name, + pg_get_expr(pg_class.relpartbound, pg_class.oid, true) AS expression + FROM pg_class base_tb + JOIN pg_inherits ON pg_inherits.inhparent = base_tb.oid + JOIN pg_class ON pg_class.oid = pg_inherits.inhrelid + WHERE base_tb.oid = :table::regclass; + SQL + + rows.map { |row| Row.new(row) } + end + + # Generate a partition name based on table and from time. + # + # table - The String name of the source table. + # from - The Time of the new partition. + # + # Returns String partition name. + def self.name(table:, from:) + validate_table(table: table) + validate_from(from: from) + + "#{table}_%d_%02d_%02d" % [from.year, from.month, from.day] + end + + # Create new partition for provided table. + # + # table - The String name of the source table. + # name - The String name of the new partition. + # from - The Time to start the range of the partition. + # to - The Time to end the range of the partition. + # + # Returns nothing. + # Raises if anything goes wrong. + def self.create(table:, name:, from:, to:) + validate_name(table: table, name: name) + validate_from(from: from) + validate_to(to: to) + + binds = { + table: SQL::LITERAL(table), + name: SQL::LITERAL(name), + from: from, + to: to, + } + + SQL.run <<~SQL.squish, binds + CREATE TABLE IF NOT EXISTS :name + PARTITION OF :table FOR VALUES FROM (:from) TO (:to) + SQL + + nil + end + + # Premake several partitions from a given time. Also tries to create a + # partition for the from time so sometimes you ask for 3 partitions but get 4 + # if the partition does not exist for the provided time. + # + # table - The String name of the source table. + # from - The Time to start premaking partitions from. + # number - The Integer number of partitions to create. + # + # Returns nothing. + # Raises if anything goes wrong. + def self.premake(table:, from: Time.now.utc, number: 3) + validate_table(table: table) + validate_from(from: from) + validate_number(number: number) + + start = from.to_date + stop = start + number + + (start..stop).each do |date| + new(table, date).create + end + + nil + end + + # Retain a given number of partitions and detch + drop the rest. + # + # table - The String name of the source table. + # from - The Time to determine retention from. + # number - The Integer number of partitions to older than from time. + # + # Returns nothing. + # Raises if anything goes wrong. + def self.retain(table:, from: Time.now.utc, number: 14) + validate_table(table: table) + validate_from(from: from) + validate_number(number: number) + + date = from.to_date - number + binds = { + relname_pattern: "#{table}_%", + max_relname: name(table: table, from: date), + } + prunable = SQL.values <<~SQL.squish, binds + SELECT relname + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE nspname = 'public' AND + relname LIKE :relname_pattern AND + relkind = 'r' AND + relname <= :max_relname + ORDER BY relname + SQL + + prunable.each { |name| + detach(table: table, name: name) + drop(table: table, name: name) + } + + nil + end + + # Drops a partition table. + # + # table - The String name of the source table. + # name - The String name of the partition. + # + # Returns nothing. + # Raises if anything goes wrong. + def self.drop(table:, name:) + validate_name(table: table, name: name) + + SQL.run <<~SQL.squish, name: SQL::LITERAL(name) + DROP TABLE IF EXISTS :name + SQL + + nil + end + + # Detaches a partition from a table. Once detached you can do whatever with it + # and it won't show up in query results. + # + # table - The String name of the source table. + # name - The String name of the partition. + # + # Returns nothing. + # Raises if anything goes wrong. + def self.detach(table:, name:) + validate_name(table: table, name: name) + + SQL.run <<~SQL.squish, table: SQL::LITERAL(table), name: SQL::LITERAL(name) + ALTER TABLE IF EXISTS :table DETACH PARTITION :name; + SQL + + nil + end + + def self.exists?(name) + raise ArgumentError, "name can't be blank" if name.blank? + + ActiveRecord::Base.connection.table_exists?(name) + end + class << self; alias exist? exists?; end + + attr_reader :from, :to, :table, :name + + def initialize(table, from) + self.class.validate_table(table: table) + self.class.validate_from(from: from) + + @from = from.to_time.utc.beginning_of_day + @to = @from + 1.day + @table = table + @name = self.class.name(table: @table, from: @from) + end + + def create + self.class.create(table: @table, name: @name, from: @from, to: @to) + end + + def premake(number) + self.class.premake(table: @table, from: @from, number: number) + end + + def retain(number) + self.class.retain(table: @table, from: @from, number: number) + end + + def detach + self.class.detach(table: @table, name: @name) + end + + def drop + self.class.drop(table: @table, name: @name) + end + + def exists? + self.class.exists?(@name) + end + alias :exist? :exists? + + def all + self.class.all(table: @table) + end +end diff --git a/app/views/articles/index.html.erb b/app/views/articles/index.html.erb index 15d7ea1..b0207c5 100644 --- a/app/views/articles/index.html.erb +++ b/app/views/articles/index.html.erb @@ -13,6 +13,7 @@