Created
February 27, 2019 15:09
-
-
Save itzg/e14fe6965175b65e14881e7c5264c707 to your computer and use it in GitHub Desktop.
Revisions
-
itzg created this gist
Feb 27, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,113 @@ import static org.junit.Assert.assertThat; import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasKey; import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue; import static org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord; import java.util.Map; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) // Test with a "real", but embedded Kafka instance, gives us the EmbeddedKafkaBroker to autowire @EmbeddedKafka( // We're only needing to test Kafka serializing interactions, so keep partitioning simple partitions = 1, // use some non-default topics to test via topics = { KafkaEgressTest.TOPIC_METRICS }) // Using @SpringBootTest mainly so we can get the standard properties binding bootstrap processing // and the properties source. The loading and binding of {@link KafkaTopicProperties} is one of // the main things we're testing in this suite. @SpringBootTest( // tell Spring Boot Kafka auto-config about the embedded kafka endpoints properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", // slice our unit test app context down to just these specific pieces classes = { // ...the service to test OurService.class, // ...use standard Sprint Boot kafka auto-config to give us KafkaTemplate, etc KafkaAutoConfiguration.class, // ...and our additional test config KafkaProducerTest.TestConfig.class } ) public class KafkaProducerTest { public static final String TOPIC_METRICS = "test.metrics.json"; // Declare our own unit test Spring config @Configuration public static class TestConfig { // Adjust our standard topic properties to point metrics at our test topic @Bean public AppProperties appProperties() { final AppProperties properties = new AppProperties(); properties.setMetrics(TOPIC_METRICS); return properties; } } // IntelliJ gets confused finding this broker bean when @SpringBootTest is activated @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") // Autowire the kafka broker registered via @EmbeddedKafka @Autowired private EmbeddedKafkaBroker embeddedKafka; // Autowire the service we're testing @Autowired OurService ourService; @Test public void testMetricsEncodedAsSent() { ourService.send("tenant-1", KafkaMessageType.METRIC, "{\"id\":1}"); final Consumer<String, String> consumer = buildConsumer( StringDeserializer.class, StringDeserializer.class ); embeddedKafka.consumeFromEmbeddedTopics(consumer, TOPIC_METRICS); final ConsumerRecord<String, String> record = getSingleRecord(consumer, TOPIC_METRICS, 500); // Use Hamcrest matchers provided by spring-kafka-test // https://docs.spring.io/spring-kafka/docs/2.2.4.RELEASE/reference/#hamcrest-matchers assertThat(record, hasKey("tenant-1")); assertThat(record, hasValue("{\"id\":1}")); } private <K,V> Consumer<K, V> buildConsumer(Class<? extends Deserializer> keyDeserializer, Class<? extends Deserializer> valueDeserializer) { // Use the procedure documented at https://docs.spring.io/spring-kafka/docs/2.2.4.RELEASE/reference/#embedded-kafka-annotation final Map<String, Object> consumerProps = KafkaTestUtils .consumerProps("testMetricsEncodedAsSent", "true", embeddedKafka); // Since we're pre-sending the messages to test for, we need to read from start of topic consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // We need to match the ser/deser used in expected application config consumerProps .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName()); consumerProps .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName()); final DefaultKafkaConsumerFactory<K, V> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps); return consumerFactory.createConsumer(); } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,7 @@ spring: resources: # disable serving of static web files since this is a REST/Actuator only web app add-mappings: false kafka: producer: value-serializer: org.apache.kafka.common.serialization.StringSerializer