Last active
May 5, 2020 12:09
-
-
Save csaltos/b3d79d43c9986024f43bac3481480d94 to your computer and use it in GitHub Desktop.
Revisions
-
csaltos revised this gist
May 5, 2020 . 1 changed file with 3 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -32,13 +32,13 @@ public void run() { countDownLatch = new CountDownLatch(1); final Properties properties = new Properties(); final Duration pollTimeout = Duration.ofMillis(100); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-fourth-application"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singleton("first_topic")); try { while (true) { final ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout); -
csaltos created this gist
May 5, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,84 @@ package com.tutorial1; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collections; import java.util.Date; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class ConsumerDemo { public static void main(String[] args) { ConsumerDemoWorker consumerDemoWorker = new ConsumerDemoWorker(); new Thread(consumerDemoWorker).start(); Runtime.getRuntime().addShutdownHook(new Thread(new ConsumerDemoCloser(consumerDemoWorker))); } private static class ConsumerDemoWorker implements Runnable { private static final Logger log = LoggerFactory.getLogger(ConsumerDemoWorker.class); private CountDownLatch countDownLatch; private Consumer<String, String> consumer; @Override public void run() { countDownLatch = new CountDownLatch(1); final Properties properties = new Properties(); final Duration pollTimeout = Duration.ofMillis(100); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-demo-3"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singleton("topic2x3")); try { while (true) { final ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout); for (final ConsumerRecord<String, String> consumerRecord : consumerRecords) { log.info("Getting consumer record key: '" + consumerRecord.key() + "', value: '" + consumerRecord.value() + "', partition: " + consumerRecord.partition() + " and offset: " + consumerRecord.offset() + " at " + new Date(consumerRecord.timestamp())); } } } catch (WakeupException e) { log.info("Consumer poll woke up"); consumer.close(); } finally { countDownLatch.countDown(); } } void shutdown() throws InterruptedException { consumer.wakeup(); countDownLatch.await(); log.info("Consumer closed"); } } private static class ConsumerDemoCloser implements Runnable { private static final Logger log = LoggerFactory.getLogger(ConsumerDemoCloser.class); private final ConsumerDemoWorker consumerDemoWorker; ConsumerDemoCloser(final ConsumerDemoWorker consumerDemoWorker) { this.consumerDemoWorker = consumerDemoWorker; } @Override public void run() { try { consumerDemoWorker.shutdown(); } catch (InterruptedException e) { log.error("Error shutting down consumer", e); } } } }