Last active
January 23, 2020 05:14
-
-
Save sdball/f4f95cf764ae37891baf1e8e8abd1788 to your computer and use it in GitHub Desktop.
Revisions
-
sdball revised this gist
Jun 15, 2016 . 2 changed files with 12 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -32,6 +32,10 @@ end options[:offset] = offset end opts.on("-g GOAL", "--goal", "Set an expected number of messages to consume. (Use to time production to Kafka.)") do |goal| options[:goal] = goal end end.parse! logfile = options[:logfile] || default_logfile @@ -40,6 +44,7 @@ client_id = options[:client_id] || script_name topic = options[:topic] offset = (options[:offset] || default_offset).to_sym goal = options[:goal].to_i kafka = Kafka.new( seed_brokers: brokers, @@ -51,6 +56,7 @@ begin partition = 0 consumed = 0 start_time = nil loop do messages = kafka.fetch_messages( @@ -60,14 +66,18 @@ ) messages.each do |message| start_time ||= Time.now.to_i offset = message.offset + 1 consumed += 1 puts "#{offset}: #{message.value} [Cumulative Runtime: #{Time.now.to_i - start_time} seconds] [#{consumed} messages so far]" end break if (goal > 0 && consumed >= goal) end rescue Interrupt puts ensure puts "Consumed #{consumed} messages." puts "Ran for #{Time.now.to_i - start_time} seconds." if start_time kafka.close end This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -58,4 +58,4 @@ puts "Produced #{produced} messages." producer.deliver_messages producer.shutdown end -
sdball revised this gist
May 5, 2016 . No changes.There are no files selected for viewing
-
sdball created this gist
May 5, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,73 @@ require "kafka" require "logger" require "optparse" script_name = File.basename($0, File.extname($0)) default_logfile = "logs/#{script_name}.log" default_offset = "latest" options = {} OptionParser.new do |opts| opts.banner = "Usage: kafka-consumer.rb [options]" opts.on("-t TOPIC", "--topic", "Kafka topic") do |topic| options[:topic] = topic end opts.on("-b BROKERS", "--brokers", "Kafka brokers, e.g. docker:9092. (optional, defaults to $KAFKA)") do |brokers| options[:brokers] = brokers.split(",") end opts.on("-l LOGFILE", "--log-file", "Ruby Kafka logfile (optional, defaults to #{default_logfile})") do |logfile| options[:logfile] = logfile end opts.on("--client-id CLIENT_ID", "Ruby Kafka client id (optional, defaults to #{script_name})") do |client_id| options[:client_id] = client_id end opts.on("-o OFFSET", "--offset", "Use \"earliest\" to fetch all messages in the topic, \"latest\" to only fetch messages produced after this consumer starts. Defaults to \"#{default_offset}\".") do |offset| unless ["earliest", "latest"].include? offset raise ArgumentError, "Offset must be either \"earliest\" or \"latest\"" end options[:offset] = offset end end.parse! logfile = options[:logfile] || default_logfile logger = Logger.new(logfile) brokers = options[:brokers] || ENV.fetch("KAFKA").split(",") client_id = options[:client_id] || script_name topic = options[:topic] offset = (options[:offset] || default_offset).to_sym kafka = Kafka.new( seed_brokers: brokers, client_id: client_id, socket_timeout: 20, logger: logger, ) begin partition = 0 consumed = 0 loop do messages = kafka.fetch_messages( topic: topic, partition: partition, offset: offset, ) messages.each do |message| puts message.value offset = message.offset + 1 consumed += 1 end end rescue Interrupt puts ensure puts "Consumed #{consumed} messages." kafka.close end This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,61 @@ require "kafka" require "logger" require "optparse" require "snappy" script_name = File.basename($0, File.extname($0)) default_logfile = "logs/#{script_name}.log" options = {} OptionParser.new do |opts| opts.banner = "Usage: kafka-producer.rb [options]" opts.on("-t TOPIC", "--topic", "Kafka topic") do |topic| options[:topic] = topic end opts.on("-b BROKERS", "--brokers", "Kafka brokers, e.g. docker:9092. (optional, defaults to $KAFKA)") do |brokers| options[:brokers] = brokers.split(",") end opts.on("-l LOGFILE", "--log-file", "Ruby Kafka logfile (optional, defaults to #{default_logfile})") do |logfile| options[:logfile] = logfile end opts.on("-c COMPRESSION", "--compression", "Compression codec to use. \"gzip\" or \"snappy\"") do |compression| options[:compression] = compression.to_sym end opts.on("--client-id CLIENT_ID", "Ruby Kafka client id (optional, defaults to #{script_name})") do |client_id| options[:client_id] = client_id end end.parse! logfile = options[:logfile] || default_logfile logger = Logger.new(logfile) brokers = options[:brokers] || ENV.fetch("KAFKA").split(",") client_id = options[:client_id] || script_name topic = options[:topic] kafka = Kafka.new( seed_brokers: brokers, client_id: client_id, logger: logger, ) producer = kafka.producer(compression_codec: options[:compression]) produced = 0 begin $stdin.each do |line| produced += 1 producer.produce(line, topic: topic) producer.deliver_messages end rescue Interrupt puts ensure puts "Produced #{produced} messages." producer.deliver_messages producer.shutdown end