require "kafka" require "logger" require "optparse" script_name = File.basename($0, File.extname($0)) default_logfile = "logs/#{script_name}.log" default_offset = "latest" options = {} OptionParser.new do |opts| opts.banner = "Usage: kafka-consumer.rb [options]" opts.on("-t TOPIC", "--topic", "Kafka topic") do |topic| options[:topic] = topic end opts.on("-b BROKERS", "--brokers", "Kafka brokers, e.g. docker:9092. (optional, defaults to $KAFKA)") do |brokers| options[:brokers] = brokers.split(",") end opts.on("-l LOGFILE", "--log-file", "Ruby Kafka logfile (optional, defaults to #{default_logfile})") do |logfile| options[:logfile] = logfile end opts.on("--client-id CLIENT_ID", "Ruby Kafka client id (optional, defaults to #{script_name})") do |client_id| options[:client_id] = client_id end opts.on("-o OFFSET", "--offset", "Use \"earliest\" to fetch all messages in the topic, \"latest\" to only fetch messages produced after this consumer starts. Defaults to \"#{default_offset}\".") do |offset| unless ["earliest", "latest"].include? offset raise ArgumentError, "Offset must be either \"earliest\" or \"latest\"" end options[:offset] = offset end end.parse! logfile = options[:logfile] || default_logfile logger = Logger.new(logfile) brokers = options[:brokers] || ENV.fetch("KAFKA").split(",") client_id = options[:client_id] || script_name topic = options[:topic] offset = (options[:offset] || default_offset).to_sym kafka = Kafka.new( seed_brokers: brokers, client_id: client_id, socket_timeout: 20, logger: logger, ) begin partition = 0 consumed = 0 loop do messages = kafka.fetch_messages( topic: topic, partition: partition, offset: offset, ) messages.each do |message| puts message.value offset = message.offset + 1 consumed += 1 end end rescue Interrupt puts ensure puts "Consumed #{consumed} messages." kafka.close end