package com.confluent.benstopford; import kafka.consumer.*; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; import org.apache.curator.test.TestingServer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; /** * Kafka testing at its most simple. * You'll need the following in your pom: * org.apache.kafka kafka_2.10 0.8.2.1 junit junit 4.11 test org.apache.curator curator-test 2.8.0 test */ public class KafkaMostBasicTest { public static final String topic = "topic1-" + System.currentTimeMillis(); private KafkaTestFixture server; private Producer producer; private ConsumerConnector consumerConnector; @Before public void setup() throws Exception { server = new KafkaTestFixture(); server.start(serverProperties()); } @After public void teardown() throws Exception { producer.close(); consumerConnector.shutdown(); server.stop(); } @Test public void shouldWriteThenRead() throws Exception { //Create a consumer ConsumerIterator it = buildConsumer(KafkaMostBasicTest.topic); //Create a producer producer = new KafkaProducer(producerProps()); //send a message producer.send(new ProducerRecord(KafkaMostBasicTest.topic, "message")).get(); //read it back MessageAndMetadata messageAndMetadata = it.next(); String value = messageAndMetadata.message(); assertThat(value, is("message")); } private ConsumerIterator buildConsumer(String topic) { Properties props = consumerProperties(); Map topicCountMap = new HashMap(); topicCountMap.put(topic, 1); ConsumerConfig consumerConfig = new ConsumerConfig(props); consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map>> consumers = consumerConnector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null)); KafkaStream stream = consumers.get(topic).get(0); return stream.iterator(); } private Properties consumerProperties() { Properties props = new Properties(); props.put("zookeeper.connect", serverProperties().get("zookeeper.connect")); props.put("group.id", "group1"); props.put("auto.offset.reset", "smallest"); return props; } private Properties producerProps() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("request.required.acks", "1"); return props; } private Properties serverProperties() { Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181"); props.put("broker.id", "1"); return props; } private static class KafkaTestFixture { private TestingServer zk; private KafkaServerStartable kafka; public void start(Properties properties) throws Exception { Integer port = getZkPort(properties); zk = new TestingServer(port); zk.start(); KafkaConfig kafkaConfig = new KafkaConfig(properties); kafka = new KafkaServerStartable(kafkaConfig); kafka.startup(); } public void stop() throws IOException { kafka.shutdown(); zk.stop(); zk.close(); } private int getZkPort(Properties properties) { String url = (String) properties.get("zookeeper.connect"); String port = url.split(":")[1]; return Integer.valueOf(port); } } }