Skip to content

Instantly share code, notes, and snippets.

@sdball
Last active January 23, 2020 05:14
Show Gist options
  • Save sdball/f4f95cf764ae37891baf1e8e8abd1788 to your computer and use it in GitHub Desktop.
Save sdball/f4f95cf764ae37891baf1e8e8abd1788 to your computer and use it in GitHub Desktop.

Revisions

  1. sdball revised this gist Jun 15, 2016. 2 changed files with 12 additions and 2 deletions.
    12 changes: 11 additions & 1 deletion kafka-consumer.rb
    Original file line number Diff line number Diff line change
    @@ -32,6 +32,10 @@
    end
    options[:offset] = offset
    end

    opts.on("-g GOAL", "--goal", "Set an expected number of messages to consume. (Use to time production to Kafka.)") do |goal|
    options[:goal] = goal
    end
    end.parse!

    logfile = options[:logfile] || default_logfile
    @@ -40,6 +44,7 @@
    client_id = options[:client_id] || script_name
    topic = options[:topic]
    offset = (options[:offset] || default_offset).to_sym
    goal = options[:goal].to_i

    kafka = Kafka.new(
    seed_brokers: brokers,
    @@ -51,6 +56,7 @@
    begin
    partition = 0
    consumed = 0
    start_time = nil

    loop do
    messages = kafka.fetch_messages(
    @@ -60,14 +66,18 @@
    )

    messages.each do |message|
    puts message.value
    start_time ||= Time.now.to_i
    offset = message.offset + 1
    consumed += 1
    puts "#{offset}: #{message.value} [Cumulative Runtime: #{Time.now.to_i - start_time} seconds] [#{consumed} messages so far]"
    end

    break if (goal > 0 && consumed >= goal)
    end
    rescue Interrupt
    puts
    ensure
    puts "Consumed #{consumed} messages."
    puts "Ran for #{Time.now.to_i - start_time} seconds." if start_time
    kafka.close
    end
    2 changes: 1 addition & 1 deletion kafka-producer.rb
    Original file line number Diff line number Diff line change
    @@ -58,4 +58,4 @@
    puts "Produced #{produced} messages."
    producer.deliver_messages
    producer.shutdown
    end
    end
  2. sdball revised this gist May 5, 2016. No changes.
  3. sdball created this gist May 5, 2016.
    73 changes: 73 additions & 0 deletions kafka-consumer.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,73 @@
    require "kafka"
    require "logger"
    require "optparse"

    script_name = File.basename($0, File.extname($0))
    default_logfile = "logs/#{script_name}.log"
    default_offset = "latest"

    options = {}
    OptionParser.new do |opts|
    opts.banner = "Usage: kafka-consumer.rb [options]"

    opts.on("-t TOPIC", "--topic", "Kafka topic") do |topic|
    options[:topic] = topic
    end

    opts.on("-b BROKERS", "--brokers", "Kafka brokers, e.g. docker:9092. (optional, defaults to $KAFKA)") do |brokers|
    options[:brokers] = brokers.split(",")
    end

    opts.on("-l LOGFILE", "--log-file", "Ruby Kafka logfile (optional, defaults to #{default_logfile})") do |logfile|
    options[:logfile] = logfile
    end

    opts.on("--client-id CLIENT_ID", "Ruby Kafka client id (optional, defaults to #{script_name})") do |client_id|
    options[:client_id] = client_id
    end

    opts.on("-o OFFSET", "--offset", "Use \"earliest\" to fetch all messages in the topic, \"latest\" to only fetch messages produced after this consumer starts. Defaults to \"#{default_offset}\".") do |offset|
    unless ["earliest", "latest"].include? offset
    raise ArgumentError, "Offset must be either \"earliest\" or \"latest\""
    end
    options[:offset] = offset
    end
    end.parse!

    logfile = options[:logfile] || default_logfile
    logger = Logger.new(logfile)
    brokers = options[:brokers] || ENV.fetch("KAFKA").split(",")
    client_id = options[:client_id] || script_name
    topic = options[:topic]
    offset = (options[:offset] || default_offset).to_sym

    kafka = Kafka.new(
    seed_brokers: brokers,
    client_id: client_id,
    socket_timeout: 20,
    logger: logger,
    )

    begin
    partition = 0
    consumed = 0

    loop do
    messages = kafka.fetch_messages(
    topic: topic,
    partition: partition,
    offset: offset,
    )

    messages.each do |message|
    puts message.value
    offset = message.offset + 1
    consumed += 1
    end
    end
    rescue Interrupt
    puts
    ensure
    puts "Consumed #{consumed} messages."
    kafka.close
    end
    61 changes: 61 additions & 0 deletions kafka-producer.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,61 @@
    require "kafka"
    require "logger"
    require "optparse"
    require "snappy"

    script_name = File.basename($0, File.extname($0))
    default_logfile = "logs/#{script_name}.log"

    options = {}
    OptionParser.new do |opts|
    opts.banner = "Usage: kafka-producer.rb [options]"

    opts.on("-t TOPIC", "--topic", "Kafka topic") do |topic|
    options[:topic] = topic
    end

    opts.on("-b BROKERS", "--brokers", "Kafka brokers, e.g. docker:9092. (optional, defaults to $KAFKA)") do |brokers|
    options[:brokers] = brokers.split(",")
    end

    opts.on("-l LOGFILE", "--log-file", "Ruby Kafka logfile (optional, defaults to #{default_logfile})") do |logfile|
    options[:logfile] = logfile
    end

    opts.on("-c COMPRESSION", "--compression", "Compression codec to use. \"gzip\" or \"snappy\"") do |compression|
    options[:compression] = compression.to_sym
    end

    opts.on("--client-id CLIENT_ID", "Ruby Kafka client id (optional, defaults to #{script_name})") do |client_id|
    options[:client_id] = client_id
    end
    end.parse!

    logfile = options[:logfile] || default_logfile
    logger = Logger.new(logfile)
    brokers = options[:brokers] || ENV.fetch("KAFKA").split(",")
    client_id = options[:client_id] || script_name
    topic = options[:topic]

    kafka = Kafka.new(
    seed_brokers: brokers,
    client_id: client_id,
    logger: logger,
    )

    producer = kafka.producer(compression_codec: options[:compression])
    produced = 0

    begin
    $stdin.each do |line|
    produced += 1
    producer.produce(line, topic: topic)
    producer.deliver_messages
    end
    rescue Interrupt
    puts
    ensure
    puts "Produced #{produced} messages."
    producer.deliver_messages
    producer.shutdown
    end