Skip to content

Instantly share code, notes, and snippets.

@itzg
Created February 27, 2019 15:09
Show Gist options
  • Save itzg/e14fe6965175b65e14881e7c5264c707 to your computer and use it in GitHub Desktop.
Save itzg/e14fe6965175b65e14881e7c5264c707 to your computer and use it in GitHub Desktop.

Revisions

  1. itzg created this gist Feb 27, 2019.
    113 changes: 113 additions & 0 deletions KafkaProducerTest.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,113 @@
    import static org.junit.Assert.assertThat;
    import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasKey;
    import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue;
    import static org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord;

    import java.util.Map;
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.serialization.Deserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.test.EmbeddedKafkaBroker;
    import org.springframework.kafka.test.context.EmbeddedKafka;
    import org.springframework.kafka.test.utils.KafkaTestUtils;
    import org.springframework.test.context.junit4.SpringRunner;

    @RunWith(SpringRunner.class)
    // Test with a "real", but embedded Kafka instance, gives us the EmbeddedKafkaBroker to autowire
    @EmbeddedKafka(
    // We're only needing to test Kafka serializing interactions, so keep partitioning simple
    partitions = 1,
    // use some non-default topics to test via
    topics = {
    KafkaEgressTest.TOPIC_METRICS
    })
    // Using @SpringBootTest mainly so we can get the standard properties binding bootstrap processing
    // and the properties source. The loading and binding of {@link KafkaTopicProperties} is one of
    // the main things we're testing in this suite.
    @SpringBootTest(
    // tell Spring Boot Kafka auto-config about the embedded kafka endpoints
    properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
    // slice our unit test app context down to just these specific pieces
    classes = {
    // ...the service to test
    OurService.class,
    // ...use standard Sprint Boot kafka auto-config to give us KafkaTemplate, etc
    KafkaAutoConfiguration.class,
    // ...and our additional test config
    KafkaProducerTest.TestConfig.class
    }
    )
    public class KafkaProducerTest {

    public static final String TOPIC_METRICS = "test.metrics.json";

    // Declare our own unit test Spring config
    @Configuration
    public static class TestConfig {

    // Adjust our standard topic properties to point metrics at our test topic
    @Bean
    public AppProperties appProperties() {
    final AppProperties properties = new AppProperties();
    properties.setMetrics(TOPIC_METRICS);
    return properties;
    }
    }

    // IntelliJ gets confused finding this broker bean when @SpringBootTest is activated
    @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
    // Autowire the kafka broker registered via @EmbeddedKafka
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    // Autowire the service we're testing
    @Autowired
    OurService ourService;

    @Test
    public void testMetricsEncodedAsSent() {
    ourService.send("tenant-1", KafkaMessageType.METRIC, "{\"id\":1}");

    final Consumer<String, String> consumer = buildConsumer(
    StringDeserializer.class,
    StringDeserializer.class
    );

    embeddedKafka.consumeFromEmbeddedTopics(consumer, TOPIC_METRICS);
    final ConsumerRecord<String, String> record = getSingleRecord(consumer, TOPIC_METRICS, 500);

    // Use Hamcrest matchers provided by spring-kafka-test
    // https://docs.spring.io/spring-kafka/docs/2.2.4.RELEASE/reference/#hamcrest-matchers
    assertThat(record, hasKey("tenant-1"));
    assertThat(record, hasValue("{\"id\":1}"));
    }

    private <K,V> Consumer<K, V> buildConsumer(Class<? extends Deserializer> keyDeserializer,
    Class<? extends Deserializer> valueDeserializer) {
    // Use the procedure documented at https://docs.spring.io/spring-kafka/docs/2.2.4.RELEASE/reference/#embedded-kafka-annotation

    final Map<String, Object> consumerProps = KafkaTestUtils
    .consumerProps("testMetricsEncodedAsSent", "true", embeddedKafka);
    // Since we're pre-sending the messages to test for, we need to read from start of topic
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // We need to match the ser/deser used in expected application config
    consumerProps
    .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
    consumerProps
    .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName());

    final DefaultKafkaConsumerFactory<K, V> consumerFactory =
    new DefaultKafkaConsumerFactory<>(consumerProps);
    return consumerFactory.createConsumer();
    }
    }
    7 changes: 7 additions & 0 deletions application.yml
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,7 @@
    spring:
    resources:
    # disable serving of static web files since this is a REST/Actuator only web app
    add-mappings: false
    kafka:
    producer:
    value-serializer: org.apache.kafka.common.serialization.StringSerializer