Skip to content

Instantly share code, notes, and snippets.

@csaltos
Last active May 5, 2020 12:09
Show Gist options
  • Select an option

  • Save csaltos/b3d79d43c9986024f43bac3481480d94 to your computer and use it in GitHub Desktop.

Select an option

Save csaltos/b3d79d43c9986024f43bac3481480d94 to your computer and use it in GitHub Desktop.

Revisions

  1. csaltos revised this gist May 5, 2020. 1 changed file with 3 additions and 3 deletions.
    6 changes: 3 additions & 3 deletions ConsumerDemo.java
    Original file line number Diff line number Diff line change
    @@ -32,13 +32,13 @@ public void run() {
    countDownLatch = new CountDownLatch(1);
    final Properties properties = new Properties();
    final Duration pollTimeout = Duration.ofMillis(100);
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-demo-3");
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-fourth-application");
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singleton("topic2x3"));
    consumer.subscribe(Collections.singleton("first_topic"));
    try {
    while (true) {
    final ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout);
  2. csaltos created this gist May 5, 2020.
    84 changes: 84 additions & 0 deletions ConsumerDemo.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,84 @@
    package com.tutorial1;

    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.errors.WakeupException;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.time.Duration;
    import java.util.Collections;
    import java.util.Date;
    import java.util.Properties;
    import java.util.concurrent.CountDownLatch;

    public class ConsumerDemo {

    public static void main(String[] args) {
    ConsumerDemoWorker consumerDemoWorker = new ConsumerDemoWorker();
    new Thread(consumerDemoWorker).start();
    Runtime.getRuntime().addShutdownHook(new Thread(new ConsumerDemoCloser(consumerDemoWorker)));
    }

    private static class ConsumerDemoWorker implements Runnable {

    private static final Logger log = LoggerFactory.getLogger(ConsumerDemoWorker.class);

    private CountDownLatch countDownLatch;
    private Consumer<String, String> consumer;

    @Override
    public void run() {
    countDownLatch = new CountDownLatch(1);
    final Properties properties = new Properties();
    final Duration pollTimeout = Duration.ofMillis(100);
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-demo-3");
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumer = new KafkaConsumer<>(properties);
    consumer.subscribe(Collections.singleton("topic2x3"));
    try {
    while (true) {
    final ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout);
    for (final ConsumerRecord<String, String> consumerRecord : consumerRecords) {
    log.info("Getting consumer record key: '" + consumerRecord.key() + "', value: '" + consumerRecord.value() + "', partition: " + consumerRecord.partition() + " and offset: " + consumerRecord.offset() + " at " + new Date(consumerRecord.timestamp()));
    }
    }
    } catch (WakeupException e) {
    log.info("Consumer poll woke up");
    consumer.close();
    } finally {
    countDownLatch.countDown();
    }
    }

    void shutdown() throws InterruptedException {
    consumer.wakeup();
    countDownLatch.await();
    log.info("Consumer closed");
    }

    }

    private static class ConsumerDemoCloser implements Runnable {

    private static final Logger log = LoggerFactory.getLogger(ConsumerDemoCloser.class);

    private final ConsumerDemoWorker consumerDemoWorker;

    ConsumerDemoCloser(final ConsumerDemoWorker consumerDemoWorker) {
    this.consumerDemoWorker = consumerDemoWorker;
    }

    @Override
    public void run() {
    try {
    consumerDemoWorker.shutdown();
    } catch (InterruptedException e) {
    log.error("Error shutting down consumer", e);
    }
    }
    }
    }