Skip to content

Instantly share code, notes, and snippets.

@exAspArk
Last active July 21, 2022 08:46
Show Gist options
  • Save exAspArk/c325bb9a75dcda5c8212 to your computer and use it in GitHub Desktop.
Save exAspArk/c325bb9a75dcda5c8212 to your computer and use it in GitHub Desktop.

Revisions

  1. exAspArk revised this gist May 29, 2015. 1 changed file with 4 additions and 3 deletions.
    7 changes: 4 additions & 3 deletions gistfile1.rb
    Original file line number Diff line number Diff line change
    @@ -1,10 +1,11 @@
    # The same algororithm which is used in Google Analytics
    # https://support.google.com/analytics/answer/2731565?hl=en
    #
    # The same algorithm which is used in Google Analytics (https://support.google.com/analytics/answer/2731565?hl=en):
    # Time-based expiry (including end of day):
    # - After 30 minutes of inactivity
    # - At midnight

    # Enable dynamic scripting for Groovy (https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting.html#_enabling_dynamic_scripting)
    # ! WARNING: please read about security first

    # Usage:
    #
    # > Elastic::CalcUserSessionsService.new(from_date: 1.day.ago, to_date: Time.current, user_ids: [1]).execute
  2. exAspArk created this gist May 29, 2015.
    165 changes: 165 additions & 0 deletions gistfile1.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,165 @@
    # The same algororithm which is used in Google Analytics
    # https://support.google.com/analytics/answer/2731565?hl=en
    #
    # Time-based expiry (including end of day):
    # - After 30 minutes of inactivity
    # - At midnight

    # Usage:
    #
    # > Elastic::CalcUserSessionsService.new(from_date: 1.day.ago, to_date: Time.current, user_ids: [1]).execute
    #
    # {
    # 1 => [
    # {
    # day: Thu, 28 May 2015,
    # sum: 125,
    # average: 25,
    # median: 20
    # }
    # ]
    # }

    class Elastic::CalcUserSessionsService
    INDEX_NAME = "your_index".freeze
    TIME_FIELD = "read_at".freeze
    MILLISECONDS_IN_SEC = 1_000
    MILLISECONDS_IN_MIN = 60 * MILLISECONDS_IN_SEC
    INACTIVITY_DURATION_IN_MILLISECONDS = 30 * MILLISECONDS_IN_MIN

    attr_reader :params

    def initialize(params)
    @params = params
    end

    def execute
    result = {}

    response = elastic_adapter.search(index: INDEX_NAME, body: query)

    response["aggregations"]["user_ids"]["buckets"].each do |user_bucket|
    result[user_bucket["key"]] = []

    user_bucket["by_days"]["buckets"].each do |session_bucket|
    next if session_bucket["sessions"]["value"].blank?

    result[user_bucket["key"]] << {
    day: Time.at(session_bucket["key"] / MILLISECONDS_IN_SEC).to_date,
    sum: session_bucket["sessions"]["value"]["sum"].to_i,
    average: session_bucket["sessions"]["value"]["average"].to_i,
    median: session_bucket["sessions"]["value"]["median"].to_i
    }
    end
    end

    result
    end

    def query
    {
    size: 0,
    query: {
    filtered: {
    filter: {
    bool: {
    must: must_filters,
    should: should_filters
    }
    }
    }
    },
    aggs: {
    user_ids: {
    terms: {
    field: "user_id"
    },
    aggs: {
    by_days: {
    date_histogram: {
    field: TIME_FIELD,
    interval: "1d"
    },
    aggs: {
    sessions: {
    scripted_metric: {
    init_script: "_agg['read_ats'] = []",
    map_script: "_agg.read_ats.add(doc['#{ TIME_FIELD }'].value)",
    combine_script: oneliner(%Q{
    sessions = []
    if (_agg.read_ats.size() < 2) {
    return sessions
    }
    _agg.read_ats.sort()
    session_started_at = _agg.read_ats[0]
    previous_read_at = session_started_at
    last_read_at = _agg.read_ats[-1]
    for (read_at in _agg.read_ats[1..-1]) {
    if (read_at - previous_read_at > #{ INACTIVITY_DURATION_IN_MILLISECONDS }) {
    if (previous_read_at - session_started_at != 0) {
    sessions << (previous_read_at - session_started_at) / #{ MILLISECONDS_IN_MIN }
    }
    session_started_at = read_at
    } else if (read_at == last_read_at && read_at - session_started_at != 0) {
    sessions << (last_read_at - session_started_at) / #{ MILLISECONDS_IN_MIN }
    }
    previous_read_at = read_at
    }
    return sessions
    }),
    reduce_script: oneliner(%Q{
    sessions = []
    stats = [:]
    for (shard_sessions in _aggs) { sessions.addAll(shard_sessions) }
    session_count = sessions.size()
    if (session_count == 0) { return stats }
    sessions.sort()
    median_session_position1 = (int)((session_count - 1) / 2)
    median_session_position2 = (int)(session_count / 2)
    stats.median = (sessions[median_session_position1] + sessions[median_session_position2]) / 2
    stats.sum = sessions.sum()
    stats.average = stats.sum / session_count
    return stats
    })
    }
    }
    }
    }
    }
    }
    }
    }
    end

    private

    def oneliner(code)
    code.gsub(/\A\s*|\s*\z/, "").gsub(/\n[ \n]*/, "; ").squish.gsub("{;", "{").gsub("; }", " }")
    end

    def must_filters
    [
    { range: { read_at: { gte: params[:from_date], lte: params[:to_date] } } }
    ]
    end

    def should_filters
    params[:user_ids].inject([]) do |result, user_id|
    result << { term: { user_id: user_id } }
    end
    end

    def elastic_adapter
    Elasticsearch::Client.new(hosts: ElasticSettings.hosts, log: ElasticSettings.log)
    end
    end