Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save mcivorsteiner/02a624a56cbee9a4a96ac0e0a089855a to your computer and use it in GitHub Desktop.
Save mcivorsteiner/02a624a56cbee9a4a96ac0e0a089855a to your computer and use it in GitHub Desktop.
module Timber
module LogDevices
# A highly efficient log device that buffers and delivers log messages over HTTPS to
# the Timber API. It uses batches, keep-alive connections, and msgpack to deliver logs with
# high-throughput and little overhead.
#
# See {#initialize} for options and more details.
class HTTP
# @private
class LogMsgQueue
def initialize(max_size)
@lock = Mutex.new
@max_size = max_size
@array = []
end
def enqueue(msg)
Timber::Config.instance.debug_logger.info("[LogMsgQueue#enqueue] Msg queue object id: #{object_id}")
Timber::Config.instance.debug_logger.info("[LogMsgQueue#enqueue] Array object id: #{@array.object_id}")
#@lock.synchronize do
@array << msg
#end
end
def flush
Timber::Config.instance.debug_logger.info("[LogMsgQueue#flush] Msg queue object id: #{object_id}")
Timber::Config.instance.debug_logger.info("[LogMsgQueue#enqueue] Array object id: #{@array.object_id}")
# @lock.synchronize do
snapshot = @array.clone
@array.clear
return snapshot
# end
end
def full?
size >= @max_size
end
def size
@array.size
end
end
# Works like SizedQueue, but drops message instead of blocking. Pass one of these in
# to {HTTP#intiialize} via the :request_queue option if you'd prefer to drop messages
# in the event of a buffer overflow instead of applying back pressure.
class DroppingSizedQueue < SizedQueue
# Returns true/false depending on whether the queue is full or not
def push(obj)
@mutex.synchronize do
return false unless @que.length < @max
@que.push obj
begin
t = @waiting.shift
t.wakeup if t
rescue ThreadError
retry
end
return true
end
end
end
TIMBER_URL = "https://logs.timber.io/frames".freeze
CONTENT_TYPE = "application/msgpack".freeze
USER_AGENT = "Timber Ruby/#{Timber::VERSION} (HTTP)".freeze
# Instantiates a new HTTP log device that can be passed to {Timber::Logger#initialize}.
#
# The class maintains a buffer which is flushed in batches to the Timber API. 2
# options control when the flush happens, `:batch_byte_size` and `:flush_interval`.
# If either of these are surpassed, the buffer will be flushed.
#
# By default, the buffer will apply back pressure log messages are generated faster than
# the client can delivery them. But you can drop messages instead by passing a
# {DroppingSizedQueue} via the `:request_queue` option.
#
# @param api_key [String] The API key provided to you after you add your application to
# [Timber](https://timber.io).
# @param [Hash] options the options to create a HTTP log device with.
# @option attributes [Symbol] :batch_size (1000) Determines the maximum of log lines in
# each HTTP payload. If the queue exceeds this limit an HTTP request will be issued. Bigger
# payloads mean higher throughput, but also use more memory. Timber will not accept
# payloads larger than 1mb.
# @option attributes [Symbol] :flush_interval (1) How often the client should
# attempt to deliver logs to the Timber API in fractional seconds. The HTTP client buffers
# logs and this options represents how often that will happen, assuming `:batch_byte_size`
# is not met.
# @option attributes [Symbol] :requests_per_conn (2500) The number of requests to send over a
# single persistent connection. After this number is met, the connection will be closed
# and a new one will be opened.
# @option attributes [Symbol] :request_queue (SizedQueue.new(3)) The request queue object that queues Net::HTTP
# requests for delivery. By deafult this is a `SizedQueue` of size `3`. Meaning once
# 3 requests are placed on the queue for delivery, back pressure will be applied. IF
# you'd prefer to drop messages instead, pass a {DroppingSizedQueue}. See examples for
# an example.
# @option attributes [Symbol] :timber_url The Timber URL to delivery the log lines. The
# default is set via {TIMBER_URL}.
#
# @example Basic usage
# Timber::Logger.new(Timber::LogDevices::HTTP.new("my_timber_api_key"))
#
# @example Dropping messages instead of applying back pressure
# http_log_device = Timber::LogDevices::HTTP.new("my_timber_api_key",
# request_queue: Timber::LogDevices::HTTP::DroppingSizedQueue.new(3))
# Timber::Logger.new(http_log_device)
def initialize(api_key, options = {})
@api_key = api_key || raise(ArgumentError.new("The api_key parameter cannot be blank"))
@timber_url = URI.parse(options[:timber_url] || ENV['TIMBER_URL'] || TIMBER_URL)
@batch_size = options[:batch_size] || 1_000
@flush_interval = options[:flush_interval] || 1 # 1 second
@requests_per_conn = options[:requests_per_conn] || 2_500
@msg_queue = LogMsgQueue.new(@batch_size)
if debug_logger
debug_logger.info("[#initialize] msg queue object id: #{@msg_queue.object_id}")
end
@request_queue = options[:request_queue] || SizedQueue.new(3)
@requests_in_flight = 0
if options[:threads] != false
@outlet_thread = Thread.new { outlet }
@flush_thread = Thread.new { intervaled_flush }
end
end
# Write a new log line message to the buffer, and deliver if the msg exceeds the
# payload limit.
def write(msg)
debug_logger.info("[#write] msg queue object id: #{@msg_queue.object_id}")
debug_logger.info("[#write] queue size before: #{@msg_queue.size}")
@msg_queue.enqueue(msg)
debug_logger.info("[#write] queue size after: #{@msg_queue.size}")
if @msg_queue.full?
debug_logger.debug("Flushing timber buffer via write") if debug_logger
flush
end
true
end
# Closes the log device, cleans up, and attempts one last delivery.
def close
@flush_thread.kill if @flush_thread
@outlet_thread.kill if @outlet_thread
flush
end
private
def debug_logger
Timber::Config.instance.debug_logger
end
def flush
@last_flush = Time.now
debug_logger.info("[#flush] msg queue object id: #{@msg_queue.object_id}")
debug_logger.info("[#flush] msg queue size before: #{@msg_queue.size}")
msgs = @msg_queue.flush
debug_logger.info("[#flush] msg queue size after: #{@msg_queue.size}")
return if msgs.empty?
req = Net::HTTP::Post.new(@timber_url.path)
req['Authorization'] = authorization_payload
req['Content-Type'] = CONTENT_TYPE
req['User-Agent'] = USER_AGENT
req.body = msgs.to_msgpack
@request_queue.enq(req)
end
def intervaled_flush
# Wait specified time period before starting
sleep @flush_interval
loop do
begin
if intervaled_flush_ready?
debug_logger.debug("Flushing timber buffer via the interval") if debug_logger
flush
end
sleep(0.1)
rescue Exception => e
logger.error("Timber intervaled flush failed: #{e.inspect}\n\n#{e.backtrace}")
end
end
end
def intervaled_flush_ready?
@last_flush.nil? || (Time.now.to_f - @last_flush.to_f).abs >= @flush_interval
end
def outlet
loop do
http = Net::HTTP.new(@timber_url.host, @timber_url.port)
http.set_debug_output(debug_logger) if debug_logger
http.use_ssl = true if @timber_url.scheme == 'https'
http.read_timeout = 30
http.ssl_timeout = 10
http.open_timeout = 10
begin
debug_logger.info("Starting Timber HTTP connection") if debug_logger
http.start do |conn|
num_reqs = 0
while num_reqs < @requests_per_conn
if debug_logger
debug_logger.debug("Waiting on next Timber request")
debug_logger.debug("Number of threads waiting on Timber request queue: #{@request_queue.num_waiting}")
end
# Blocks waiting for a request.
req = @request_queue.deq
@requests_in_flight += 1
resp = nil
begin
resp = conn.request(req)
rescue => e
debug_logger.error("Timber request error: #{e.message}") if debug_logger
next
ensure
@requests_in_flight -= 1
end
num_reqs += 1
debug_logger.debug("Timber request successful: #{resp.code}") if debug_logger
end
end
rescue => e
debug_logger.error("Timber request error: #{e.message}") if debug_logger
ensure
debug_logger.debug("Finishing Timber HTTP connection") if debug_logger
http.finish if http.started?
end
end
end
def authorization_payload
@authorization_payload ||= "Basic #{Base64.urlsafe_encode64(@api_key).chomp}"
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment