# The same algorithm which is used in Google Analytics (https://support.google.com/analytics/answer/2731565?hl=en): # Time-based expiry (including end of day): # - After 30 minutes of inactivity # - At midnight # Enable dynamic scripting for Groovy (https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting.html#_enabling_dynamic_scripting) # ! WARNING: please read about security first # Usage: # # > Elastic::CalcUserSessionsService.new(from_date: 1.day.ago, to_date: Time.current, user_ids: [1]).execute # # { # 1 => [ # { # day: Thu, 28 May 2015, # sum: 125, # average: 25, # median: 20 # } # ] # } class Elastic::CalcUserSessionsService INDEX_NAME = "your_index".freeze TIME_FIELD = "read_at".freeze MILLISECONDS_IN_SEC = 1_000 MILLISECONDS_IN_MIN = 60 * MILLISECONDS_IN_SEC INACTIVITY_DURATION_IN_MILLISECONDS = 30 * MILLISECONDS_IN_MIN attr_reader :params def initialize(params) @params = params end def execute result = {} response = elastic_adapter.search(index: INDEX_NAME, body: query) response["aggregations"]["user_ids"]["buckets"].each do |user_bucket| result[user_bucket["key"]] = [] user_bucket["by_days"]["buckets"].each do |session_bucket| next if session_bucket["sessions"]["value"].blank? result[user_bucket["key"]] << { day: Time.at(session_bucket["key"] / MILLISECONDS_IN_SEC).to_date, sum: session_bucket["sessions"]["value"]["sum"].to_i, average: session_bucket["sessions"]["value"]["average"].to_i, median: session_bucket["sessions"]["value"]["median"].to_i } end end result end def query { size: 0, query: { filtered: { filter: { bool: { must: must_filters, should: should_filters } } } }, aggs: { user_ids: { terms: { field: "user_id" }, aggs: { by_days: { date_histogram: { field: TIME_FIELD, interval: "1d" }, aggs: { sessions: { scripted_metric: { init_script: "_agg['read_ats'] = []", map_script: "_agg.read_ats.add(doc['#{ TIME_FIELD }'].value)", combine_script: oneliner(%Q{ sessions = [] if (_agg.read_ats.size() < 2) { return sessions } _agg.read_ats.sort() session_started_at = _agg.read_ats[0] previous_read_at = session_started_at last_read_at = _agg.read_ats[-1] for (read_at in _agg.read_ats[1..-1]) { if (read_at - previous_read_at > #{ INACTIVITY_DURATION_IN_MILLISECONDS }) { if (previous_read_at - session_started_at != 0) { sessions << (previous_read_at - session_started_at) / #{ MILLISECONDS_IN_MIN } } session_started_at = read_at } else if (read_at == last_read_at && read_at - session_started_at != 0) { sessions << (last_read_at - session_started_at) / #{ MILLISECONDS_IN_MIN } } previous_read_at = read_at } return sessions }), reduce_script: oneliner(%Q{ sessions = [] stats = [:] for (shard_sessions in _aggs) { sessions.addAll(shard_sessions) } session_count = sessions.size() if (session_count == 0) { return stats } sessions.sort() median_session_position1 = (int)((session_count - 1) / 2) median_session_position2 = (int)(session_count / 2) stats.median = (sessions[median_session_position1] + sessions[median_session_position2]) / 2 stats.sum = sessions.sum() stats.average = stats.sum / session_count return stats }) } } } } } } } } end private def oneliner(code) code.gsub(/\A\s*|\s*\z/, "").gsub(/\n[ \n]*/, "; ").squish.gsub("{;", "{").gsub("; }", " }") end def must_filters [ { range: { read_at: { gte: params[:from_date], lte: params[:to_date] } } } ] end def should_filters params[:user_ids].inject([]) do |result, user_id| result << { term: { user_id: user_id } } end end def elastic_adapter Elasticsearch::Client.new(hosts: ElasticSettings.hosts, log: ElasticSettings.log) end end