Last active
July 21, 2022 08:46
-
-
Save exAspArk/c325bb9a75dcda5c8212 to your computer and use it in GitHub Desktop.
Revisions
-
exAspArk revised this gist
May 29, 2015 . 1 changed file with 4 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,10 +1,11 @@ # The same algorithm which is used in Google Analytics (https://support.google.com/analytics/answer/2731565?hl=en): # Time-based expiry (including end of day): # - After 30 minutes of inactivity # - At midnight # Enable dynamic scripting for Groovy (https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting.html#_enabling_dynamic_scripting) # ! WARNING: please read about security first # Usage: # # > Elastic::CalcUserSessionsService.new(from_date: 1.day.ago, to_date: Time.current, user_ids: [1]).execute -
exAspArk created this gist
May 29, 2015 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,165 @@ # The same algororithm which is used in Google Analytics # https://support.google.com/analytics/answer/2731565?hl=en # # Time-based expiry (including end of day): # - After 30 minutes of inactivity # - At midnight # Usage: # # > Elastic::CalcUserSessionsService.new(from_date: 1.day.ago, to_date: Time.current, user_ids: [1]).execute # # { # 1 => [ # { # day: Thu, 28 May 2015, # sum: 125, # average: 25, # median: 20 # } # ] # } class Elastic::CalcUserSessionsService INDEX_NAME = "your_index".freeze TIME_FIELD = "read_at".freeze MILLISECONDS_IN_SEC = 1_000 MILLISECONDS_IN_MIN = 60 * MILLISECONDS_IN_SEC INACTIVITY_DURATION_IN_MILLISECONDS = 30 * MILLISECONDS_IN_MIN attr_reader :params def initialize(params) @params = params end def execute result = {} response = elastic_adapter.search(index: INDEX_NAME, body: query) response["aggregations"]["user_ids"]["buckets"].each do |user_bucket| result[user_bucket["key"]] = [] user_bucket["by_days"]["buckets"].each do |session_bucket| next if session_bucket["sessions"]["value"].blank? result[user_bucket["key"]] << { day: Time.at(session_bucket["key"] / MILLISECONDS_IN_SEC).to_date, sum: session_bucket["sessions"]["value"]["sum"].to_i, average: session_bucket["sessions"]["value"]["average"].to_i, median: session_bucket["sessions"]["value"]["median"].to_i } end end result end def query { size: 0, query: { filtered: { filter: { bool: { must: must_filters, should: should_filters } } } }, aggs: { user_ids: { terms: { field: "user_id" }, aggs: { by_days: { date_histogram: { field: TIME_FIELD, interval: "1d" }, aggs: { sessions: { scripted_metric: { init_script: "_agg['read_ats'] = []", map_script: "_agg.read_ats.add(doc['#{ TIME_FIELD }'].value)", combine_script: oneliner(%Q{ sessions = [] if (_agg.read_ats.size() < 2) { return sessions } _agg.read_ats.sort() session_started_at = _agg.read_ats[0] previous_read_at = session_started_at last_read_at = _agg.read_ats[-1] for (read_at in _agg.read_ats[1..-1]) { if (read_at - previous_read_at > #{ INACTIVITY_DURATION_IN_MILLISECONDS }) { if (previous_read_at - session_started_at != 0) { sessions << (previous_read_at - session_started_at) / #{ MILLISECONDS_IN_MIN } } session_started_at = read_at } else if (read_at == last_read_at && read_at - session_started_at != 0) { sessions << (last_read_at - session_started_at) / #{ MILLISECONDS_IN_MIN } } previous_read_at = read_at } return sessions }), reduce_script: oneliner(%Q{ sessions = [] stats = [:] for (shard_sessions in _aggs) { sessions.addAll(shard_sessions) } session_count = sessions.size() if (session_count == 0) { return stats } sessions.sort() median_session_position1 = (int)((session_count - 1) / 2) median_session_position2 = (int)(session_count / 2) stats.median = (sessions[median_session_position1] + sessions[median_session_position2]) / 2 stats.sum = sessions.sum() stats.average = stats.sum / session_count return stats }) } } } } } } } } end private def oneliner(code) code.gsub(/\A\s*|\s*\z/, "").gsub(/\n[ \n]*/, "; ").squish.gsub("{;", "{").gsub("; }", " }") end def must_filters [ { range: { read_at: { gte: params[:from_date], lte: params[:to_date] } } } ] end def should_filters params[:user_ids].inject([]) do |result, user_id| result << { term: { user_id: user_id } } end end def elastic_adapter Elasticsearch::Client.new(hosts: ElasticSettings.hosts, log: ElasticSettings.log) end end