Skip to content

Instantly share code, notes, and snippets.

@vzickner
Last active June 21, 2023 10:15
Show Gist options
  • Save vzickner/577c53164a97b9918a49e6c0235813f4 to your computer and use it in GitHub Desktop.
Save vzickner/577c53164a97b9918a49e6c0235813f4 to your computer and use it in GitHub Desktop.

Revisions

  1. vzickner revised this gist Mar 29, 2020. 1 changed file with 6 additions and 4 deletions.
    10 changes: 6 additions & 4 deletions SimpleKafkaTest.java
    Original file line number Diff line number Diff line change
    @@ -5,9 +5,10 @@
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.AfterAll;
    import org.junit.jupiter.api.BeforeAll;
    import org.junit.jupiter.api.Test;
    import org.junit.jupiter.api.TestInstance;
    import org.junit.jupiter.api.extension.ExtendWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    @@ -31,6 +32,7 @@

    @EmbeddedKafka
    @ExtendWith(SpringExtension.class)
    @TestInstance(TestInstance.Lifecycle.PER_CLASS)
    public class SimpleKafkaTest {

    private static final String TOPIC = "domain-events";
    @@ -42,7 +44,7 @@ public class SimpleKafkaTest {

    KafkaMessageListenerContainer<String, String> container;

    @BeforeEach
    @BeforeAll
    void setUp() {
    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
    DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer());
    @@ -54,7 +56,7 @@ void setUp() {
    ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
    }

    @AfterEach
    @AfterAll
    void tearDown() {
    container.stop();
    }
  2. vzickner revised this gist Mar 8, 2020. 2 changed files with 43 additions and 38 deletions.
    53 changes: 30 additions & 23 deletions SimpleKafkaTest.java
    Original file line number Diff line number Diff line change
    @@ -1,59 +1,66 @@
    package com.mimacom.kafka.eventsourcing;
    package com.example.demo;


    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.junit.jupiter.api.AfterEach;
    import org.junit.jupiter.api.BeforeEach;
    import org.junit.jupiter.api.Test;
    import org.junit.jupiter.api.extension.ExtendWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.listener.ContainerProperties;
    import org.springframework.kafka.listener.KafkaMessageListenerContainer;
    import org.springframework.kafka.listener.MessageListener;
    import org.springframework.kafka.test.EmbeddedKafkaBroker;
    import org.springframework.kafka.test.context.EmbeddedKafka;
    import org.springframework.kafka.test.utils.ContainerTestUtils;
    import org.springframework.kafka.test.utils.KafkaTestUtils;
    import org.springframework.test.context.junit4.SpringRunner;
    import org.springframework.test.context.junit.jupiter.SpringExtension;

    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;

    import static java.util.Collections.singleton;
    import static org.assertj.core.api.Assertions.assertThat;

    @RunWith(SpringRunner.class)
    @SpringBootTest
    @EmbeddedKafka
    @ExtendWith(SpringExtension.class)
    public class SimpleKafkaTest {

    private static final String TOPIC = "domain-events";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    private Consumer<String, String> consumer;
    BlockingQueue<ConsumerRecord<String, String>> records;

    KafkaMessageListenerContainer<String, String> container;

    @Before
    public void setUp() {
    @BeforeEach
    void setUp() {
    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
    consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
    consumer.subscribe(singleton(TOPIC));
    consumer.poll(0);
    DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer());
    ContainerProperties containerProperties = new ContainerProperties(TOPIC);
    container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
    records = new LinkedBlockingQueue<>();
    container.setupMessageListener((MessageListener<String, String>) records::add);
    container.start();
    ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
    }

    @After
    public void tearDown() {
    consumer.close();
    @AfterEach
    void tearDown() {
    container.stop();
    }

    @Test
    public void kafkaSetup_withTopic_ensureSendMessageIsReceived() {
    public void kafkaSetup_withTopic_ensureSendMessageIsReceived() throws Exception {
    // Arrange
    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
    Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
    @@ -63,7 +70,7 @@ public void kafkaSetup_withTopic_ensureSendMessageIsReceived() {
    producer.flush();

    // Assert
    ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC);
    ConsumerRecord<String, String> singleRecord = records.poll(100, TimeUnit.MILLISECONDS);
    assertThat(singleRecord).isNotNull();
    assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
    assertThat(singleRecord.value()).isEqualTo("{\"event\":\"Test Event\"}");
    28 changes: 13 additions & 15 deletions pom.xml
    Original file line number Diff line number Diff line change
    @@ -1,26 +1,20 @@
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>kafka-test-sample</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kafka-test-sample</name>
    <description>Demo project for Spring Boot</description>

    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.6.RELEASE</version>
    <version>2.2.5.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    </properties>

    @@ -38,8 +32,13 @@
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    <exclusions>
    <exclusion>
    <groupId>org.junit.vintage</groupId>
    <artifactId>junit-vintage-engine</artifactId>
    </exclusion>
    </exclusions>
    </dependency>

    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    @@ -56,5 +55,4 @@
    </plugins>
    </build>


    </project>
  3. vzickner revised this gist Jul 28, 2019. 2 changed files with 6 additions and 6 deletions.
    10 changes: 5 additions & 5 deletions SimpleKafkaTest.java
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    package com.example.kafkatestsample;
    package com.mimacom.kafka.eventsourcing;


    import org.apache.kafka.clients.consumer.Consumer;
    @@ -15,8 +15,8 @@
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.test.EmbeddedKafkaBroker;
    import org.springframework.kafka.test.context.EmbeddedKafka;
    import org.springframework.kafka.test.rule.KafkaEmbedded;
    import org.springframework.kafka.test.utils.KafkaTestUtils;
    import org.springframework.test.context.junit4.SpringRunner;

    @@ -34,14 +34,14 @@ public class SimpleKafkaTest {
    private static final String TOPIC = "domain-events";

    @Autowired
    private KafkaEmbedded kafkaEmbedded;
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    private Consumer<String, String> consumer;


    @Before
    public void setUp() {
    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", kafkaEmbedded));
    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
    consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
    consumer.subscribe(singleton(TOPIC));
    consumer.poll(0);
    @@ -55,7 +55,7 @@ public void tearDown() {
    @Test
    public void kafkaSetup_withTopic_ensureSendMessageIsReceived() {
    // Arrange
    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(kafkaEmbedded));
    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
    Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();

    // Act
    2 changes: 1 addition & 1 deletion pom.xml
    Original file line number Diff line number Diff line change
    @@ -14,7 +14,7 @@
    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.5.RELEASE</version>
    <version>2.1.6.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
    </parent>

  4. vzickner revised this gist May 19, 2019. No changes.
  5. vzickner revised this gist Oct 12, 2018. 1 changed file with 0 additions and 1 deletion.
    1 change: 0 additions & 1 deletion SimpleKafkaTest.java
    Original file line number Diff line number Diff line change
    @@ -2,7 +2,6 @@


    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
  6. vzickner revised this gist Oct 12, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion SimpleKafkaTest.java
    Original file line number Diff line number Diff line change
    @@ -43,9 +43,9 @@ public class SimpleKafkaTest {
    @Before
    public void setUp() {
    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", kafkaEmbedded));
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
    consumer.subscribe(singleton(TOPIC));
    consumer.poll(0);
    }

    @After
  7. vzickner created this gist Oct 4, 2018.
    73 changes: 73 additions & 0 deletions SimpleKafkaTest.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,73 @@
    package com.example.kafkatestsample;


    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.test.context.EmbeddedKafka;
    import org.springframework.kafka.test.rule.KafkaEmbedded;
    import org.springframework.kafka.test.utils.KafkaTestUtils;
    import org.springframework.test.context.junit4.SpringRunner;

    import java.util.HashMap;
    import java.util.Map;

    import static java.util.Collections.singleton;
    import static org.assertj.core.api.Assertions.assertThat;

    @RunWith(SpringRunner.class)
    @SpringBootTest
    @EmbeddedKafka
    public class SimpleKafkaTest {

    private static final String TOPIC = "domain-events";

    @Autowired
    private KafkaEmbedded kafkaEmbedded;

    private Consumer<String, String> consumer;


    @Before
    public void setUp() {
    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", kafkaEmbedded));
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
    consumer.subscribe(singleton(TOPIC));
    }

    @After
    public void tearDown() {
    consumer.close();
    }

    @Test
    public void kafkaSetup_withTopic_ensureSendMessageIsReceived() {
    // Arrange
    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(kafkaEmbedded));
    Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();

    // Act
    producer.send(new ProducerRecord<>(TOPIC, "my-aggregate-id", "{\"event\":\"Test Event\"}"));
    producer.flush();

    // Assert
    ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC);
    assertThat(singleRecord).isNotNull();
    assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
    assertThat(singleRecord.value()).isEqualTo("{\"event\":\"Test Event\"}");
    }

    }
    60 changes: 60 additions & 0 deletions pom.xml
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,60 @@
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>kafka-test-sample</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kafka-test-sample</name>
    <description>Demo project for Spring Boot</description>

    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.5.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    </properties>

    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>

    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
    </dependency>
    </dependencies>

    <build>
    <plugins>
    <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    </plugin>
    </plugins>
    </build>


    </project>