|  |  | @@ -0,0 +1,165 @@ | 
    
    |  |  | # The same algororithm which is used in Google Analytics | 
    
    |  |  | # https://support.google.com/analytics/answer/2731565?hl=en | 
    
    |  |  | # | 
    
    |  |  | # Time-based expiry (including end of day): | 
    
    |  |  | # - After 30 minutes of inactivity | 
    
    |  |  | # - At midnight | 
    
    |  |  | 
 | 
    
    |  |  | # Usage: | 
    
    |  |  | # | 
    
    |  |  | # > Elastic::CalcUserSessionsService.new(from_date: 1.day.ago, to_date: Time.current, user_ids: [1]).execute | 
    
    |  |  | # | 
    
    |  |  | # { | 
    
    |  |  | #   1 => [ | 
    
    |  |  | #     { | 
    
    |  |  | #           day: Thu, 28 May 2015, | 
    
    |  |  | #           sum: 125, | 
    
    |  |  | #       average: 25, | 
    
    |  |  | #        median: 20 | 
    
    |  |  | #     } | 
    
    |  |  | #   ] | 
    
    |  |  | # } | 
    
    |  |  | 
 | 
    
    |  |  | class Elastic::CalcUserSessionsService | 
    
    |  |  | INDEX_NAME = "your_index".freeze | 
    
    |  |  | TIME_FIELD = "read_at".freeze | 
    
    |  |  | MILLISECONDS_IN_SEC = 1_000 | 
    
    |  |  | MILLISECONDS_IN_MIN = 60 * MILLISECONDS_IN_SEC | 
    
    |  |  | INACTIVITY_DURATION_IN_MILLISECONDS = 30 * MILLISECONDS_IN_MIN | 
    
    |  |  | 
 | 
    
    |  |  | attr_reader :params | 
    
    |  |  | 
 | 
    
    |  |  | def initialize(params) | 
    
    |  |  | @params = params | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def execute | 
    
    |  |  | result = {} | 
    
    |  |  | 
 | 
    
    |  |  | response = elastic_adapter.search(index: INDEX_NAME, body: query) | 
    
    |  |  | 
 | 
    
    |  |  | response["aggregations"]["user_ids"]["buckets"].each do |user_bucket| | 
    
    |  |  | result[user_bucket["key"]] = [] | 
    
    |  |  | 
 | 
    
    |  |  | user_bucket["by_days"]["buckets"].each do |session_bucket| | 
    
    |  |  | next if session_bucket["sessions"]["value"].blank? | 
    
    |  |  | 
 | 
    
    |  |  | result[user_bucket["key"]] << { | 
    
    |  |  | day:     Time.at(session_bucket["key"] / MILLISECONDS_IN_SEC).to_date, | 
    
    |  |  | sum:     session_bucket["sessions"]["value"]["sum"].to_i, | 
    
    |  |  | average: session_bucket["sessions"]["value"]["average"].to_i, | 
    
    |  |  | median:  session_bucket["sessions"]["value"]["median"].to_i | 
    
    |  |  | } | 
    
    |  |  | end | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | result | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def query | 
    
    |  |  | { | 
    
    |  |  | size: 0, | 
    
    |  |  | query: { | 
    
    |  |  | filtered: { | 
    
    |  |  | filter: { | 
    
    |  |  | bool: { | 
    
    |  |  | must: must_filters, | 
    
    |  |  | should: should_filters | 
    
    |  |  | } | 
    
    |  |  | } | 
    
    |  |  | } | 
    
    |  |  | }, | 
    
    |  |  | aggs: { | 
    
    |  |  | user_ids: { | 
    
    |  |  | terms: { | 
    
    |  |  | field: "user_id" | 
    
    |  |  | }, | 
    
    |  |  | aggs: { | 
    
    |  |  | by_days: { | 
    
    |  |  | date_histogram: { | 
    
    |  |  | field: TIME_FIELD, | 
    
    |  |  | interval: "1d" | 
    
    |  |  | }, | 
    
    |  |  | aggs: { | 
    
    |  |  | sessions: { | 
    
    |  |  | scripted_metric: { | 
    
    |  |  | init_script: "_agg['read_ats'] = []", | 
    
    |  |  | map_script: "_agg.read_ats.add(doc['#{ TIME_FIELD }'].value)", | 
    
    |  |  | combine_script: oneliner(%Q{ | 
    
    |  |  | sessions = [] | 
    
    |  |  | if (_agg.read_ats.size() < 2) { | 
    
    |  |  | return sessions | 
    
    |  |  | } | 
    
    |  |  |  | 
    
    |  |  | _agg.read_ats.sort() | 
    
    |  |  | session_started_at = _agg.read_ats[0] | 
    
    |  |  | previous_read_at = session_started_at | 
    
    |  |  | last_read_at = _agg.read_ats[-1] | 
    
    |  |  |  | 
    
    |  |  | for (read_at in _agg.read_ats[1..-1]) { | 
    
    |  |  | if (read_at - previous_read_at > #{ INACTIVITY_DURATION_IN_MILLISECONDS }) { | 
    
    |  |  | if (previous_read_at - session_started_at != 0) { | 
    
    |  |  | sessions << (previous_read_at - session_started_at) / #{ MILLISECONDS_IN_MIN } | 
    
    |  |  | } | 
    
    |  |  | session_started_at = read_at | 
    
    |  |  | } else if (read_at == last_read_at && read_at - session_started_at != 0) { | 
    
    |  |  | sessions << (last_read_at - session_started_at) / #{ MILLISECONDS_IN_MIN } | 
    
    |  |  | } | 
    
    |  |  |  | 
    
    |  |  | previous_read_at = read_at | 
    
    |  |  | } | 
    
    |  |  |  | 
    
    |  |  | return sessions | 
    
    |  |  | }), | 
    
    |  |  | reduce_script: oneliner(%Q{ | 
    
    |  |  | sessions = [] | 
    
    |  |  | stats = [:] | 
    
    |  |  |  | 
    
    |  |  | for (shard_sessions in _aggs) { sessions.addAll(shard_sessions) } | 
    
    |  |  |  | 
    
    |  |  | session_count = sessions.size() | 
    
    |  |  |  | 
    
    |  |  | if (session_count == 0) { return stats } | 
    
    |  |  |  | 
    
    |  |  | sessions.sort() | 
    
    |  |  |  | 
    
    |  |  | median_session_position1 = (int)((session_count - 1) / 2) | 
    
    |  |  | median_session_position2 = (int)(session_count / 2) | 
    
    |  |  | stats.median = (sessions[median_session_position1] + sessions[median_session_position2]) / 2 | 
    
    |  |  | stats.sum = sessions.sum() | 
    
    |  |  | stats.average = stats.sum / session_count | 
    
    |  |  |  | 
    
    |  |  | return stats | 
    
    |  |  | }) | 
    
    |  |  | } | 
    
    |  |  | } | 
    
    |  |  | } | 
    
    |  |  | } | 
    
    |  |  | } | 
    
    |  |  | } | 
    
    |  |  | } | 
    
    |  |  | } | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | private | 
    
    |  |  | 
 | 
    
    |  |  | def oneliner(code) | 
    
    |  |  | code.gsub(/\A\s*|\s*\z/, "").gsub(/\n[ \n]*/, "; ").squish.gsub("{;", "{").gsub("; }", " }") | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def must_filters | 
    
    |  |  | [ | 
    
    |  |  | { range: { read_at: { gte: params[:from_date], lte: params[:to_date] } } } | 
    
    |  |  | ] | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def should_filters | 
    
    |  |  | params[:user_ids].inject([]) do |result, user_id| | 
    
    |  |  | result << { term: { user_id: user_id } } | 
    
    |  |  | end | 
    
    |  |  | end | 
    
    |  |  | 
 | 
    
    |  |  | def elastic_adapter | 
    
    |  |  | Elasticsearch::Client.new(hosts: ElasticSettings.hosts, log: ElasticSettings.log) | 
    
    |  |  | end | 
    
    |  |  | end |