require "kafka" require "logger" require "optparse" require "snappy" script_name = File.basename($0, File.extname($0)) default_logfile = "logs/#{script_name}.log" options = {} OptionParser.new do |opts| opts.banner = "Usage: kafka-producer.rb [options]" opts.on("-t TOPIC", "--topic", "Kafka topic") do |topic| options[:topic] = topic end opts.on("-b BROKERS", "--brokers", "Kafka brokers, e.g. docker:9092. (optional, defaults to $KAFKA)") do |brokers| options[:brokers] = brokers.split(",") end opts.on("-l LOGFILE", "--log-file", "Ruby Kafka logfile (optional, defaults to #{default_logfile})") do |logfile| options[:logfile] = logfile end opts.on("-c COMPRESSION", "--compression", "Compression codec to use. \"gzip\" or \"snappy\"") do |compression| options[:compression] = compression.to_sym end opts.on("--client-id CLIENT_ID", "Ruby Kafka client id (optional, defaults to #{script_name})") do |client_id| options[:client_id] = client_id end end.parse! logfile = options[:logfile] || default_logfile logger = Logger.new(logfile) brokers = options[:brokers] || ENV.fetch("KAFKA").split(",") client_id = options[:client_id] || script_name topic = options[:topic] kafka = Kafka.new( seed_brokers: brokers, client_id: client_id, logger: logger, ) producer = kafka.producer(compression_codec: options[:compression]) produced = 0 begin $stdin.each do |line| produced += 1 producer.produce(line, topic: topic) producer.deliver_messages end rescue Interrupt puts ensure puts "Produced #{produced} messages." producer.deliver_messages producer.shutdown end