Last active
June 20, 2025 21:03
-
-
Save cescoffier/23ca7b2bcc8c49cee3db29b3f2b59e4a to your computer and use it in GitHub Desktop.
Revisions
-
cescoffier revised this gist
Oct 14, 2020 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,5 +1,6 @@ //usr/bin/env jbang "$0" "$@" ; exit $? //DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1 //DEPS io.quarkus:quarkus-smallrye-health:1.9.0.CR1 //DEPS org.testcontainers:kafka:1.15.0-rc2 //JAVAC_OPTIONS -parameters //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager -
cescoffier revised this gist
Oct 14, 2020 . 1 changed file with 94 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,94 @@ //usr/bin/env jbang "$0" "$@" ; exit $? //DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1 //DEPS org.testcontainers:kafka:1.15.0-rc2 //JAVAC_OPTIONS -parameters //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager //JAVA_OPTIONS -Dmp.messaging.outgoing.movies-out.connector=smallrye-kafka -Dmp.messaging.outgoing.movies-out.topic=movies -Dmp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer //JAVA_OPTIONS -Dmp.messaging.incoming.movies.connector=smallrye-kafka -Dmp.messaging.incoming.movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.movies.auto.offset.reset=earliest //JAVA_OPTIONS -Dmp.messaging.incoming.movies.failure-strategy=dead-letter-queue //JAVA_OPTIONS -Dmp.messaging.incoming.dead-letter-topic-movies.connector=smallrye-kafka -Dmp.messaging.incoming.dead-letter-topic-movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.dead-letter-topic-movies.auto.offset.reset=earliest package foo; import io.quarkus.runtime.Quarkus; import io.quarkus.runtime.annotations.QuarkusMain; import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.jboss.logging.Logger; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName; import javax.enterprise.context.ApplicationScoped; import java.util.concurrent.CompletionStage; public class KafkaDeadLetterTopic { static final Logger LOGGER = Logger.getLogger("Kafka-Dead-Letter-Topic"); @QuarkusMain static class Main { public static void main(String... args) { LOGGER.info("Starting Kafka..."); KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1")); kafka.start(); LOGGER.infof("Kafka started: %s", kafka.getBootstrapServers()); System.setProperty("kafka.bootstrap.servers", kafka.getBootstrapServers()); Quarkus.run(args); } } @ApplicationScoped public static class MovieProcessor { @Incoming("movies") public void consume(String movie) { LOGGER.infof("Receiving movie %s", movie); if (movie.contains("'")) { throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie); } if (movie.contains(",")) { throw new IllegalArgumentException("I don't like movie with , in their title: " + movie); } } } @ApplicationScoped public static class MovieWriter { @Outgoing("movies-out") public Multi<String> generate() { return Multi.createFrom().items( "The Shawshank Redemption", "The Godfather", "The Godfather: Part II", "The Dark Knight", "12 Angry Men", "Schindler's List", "The Lord of the Rings: The Return of the King", "Pulp Fiction", "The Good, the Bad and the Ugly", "The Lord of the Rings: The Fellowship of the Ring" ); } } @ApplicationScoped public static class DeadLetterTopicReader { @SuppressWarnings("unchecked") @Incoming("dead-letter-topic-movies") public CompletionStage<Void> dead(Message<String> rejected) { IncomingKafkaRecordMetadata<String, String> metadata = rejected.getMetadata(IncomingKafkaRecordMetadata.class) .orElseThrow(() -> new IllegalArgumentException("Expected a message coming from Kafka")); String reason = new String(metadata.getHeaders().lastHeader("dead-letter-reason").value()); LOGGER.infof("The message '%s' has been rejected and sent to the DLT. The reason is: '%s'.", rejected.getPayload(), reason); return rejected.ack(); } } } -
cescoffier revised this gist
Oct 14, 2020 . 1 changed file with 77 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,77 @@ //usr/bin/env jbang "$0" "$@" ; exit $? //DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1 //DEPS io.quarkus:quarkus-smallrye-health:1.9.0.CR1 //DEPS org.testcontainers:kafka:1.15.0-rc2 //JAVAC_OPTIONS -parameters //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager //JAVA_OPTIONS -Dmp.messaging.outgoing.movies-out.connector=smallrye-kafka -Dmp.messaging.outgoing.movies-out.topic=movies -Dmp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer //JAVA_OPTIONS -Dmp.messaging.incoming.movies.connector=smallrye-kafka -Dmp.messaging.incoming.movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.movies.auto.offset.reset=earliest //JAVA_OPTIONS -Dmp.messaging.incoming.movies.failure-strategy=ignore package foo; import io.quarkus.runtime.Quarkus; import io.quarkus.runtime.annotations.QuarkusMain; import io.smallrye.mutiny.Multi; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.jboss.logging.Logger; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName; import javax.enterprise.context.ApplicationScoped; public class KafkaIgnoreFailure { static final Logger LOGGER = Logger.getLogger("Kafka-Ignore"); @QuarkusMain static class Main { public static void main(String... args) { LOGGER.info("Starting Kafka..."); KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1")); kafka.start(); LOGGER.infof("Kafka started: %s", kafka.getBootstrapServers()); System.setProperty("kafka.bootstrap.servers", kafka.getBootstrapServers()); Quarkus.run(args); } } @ApplicationScoped public static class MovieProcessor { @Incoming("movies") public void consume(String movie) { LOGGER.infof("Receiving movie %s", movie); if (movie.contains("'")) { throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie); } if (movie.contains(",")) { throw new IllegalArgumentException("I don't like movie with , in their title: " + movie); } } } @ApplicationScoped public static class MovieWriter { @Outgoing("movies-out") public Multi<String> generate() { return Multi.createFrom().items( "The Shawshank Redemption", "The Godfather", "The Godfather: Part II", "The Dark Knight", "12 Angry Men", "Schindler's List", "The Lord of the Rings: The Return of the King", "Pulp Fiction", "The Good, the Bad and the Ugly", "The Lord of the Rings: The Fellowship of the Ring" ); } } } -
cescoffier created this gist
Oct 14, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,76 @@ //usr/bin/env jbang "$0" "$@" ; exit $? //DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1 //DEPS io.quarkus:quarkus-smallrye-health:1.9.0.CR1 //DEPS org.testcontainers:kafka:1.15.0-rc2 //JAVAC_OPTIONS -parameters //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager //JAVA_OPTIONS -Dmp.messaging.outgoing.movies-out.connector=smallrye-kafka -Dmp.messaging.outgoing.movies-out.topic=movies -Dmp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer //JAVA_OPTIONS -Dmp.messaging.incoming.movies.connector=smallrye-kafka -Dmp.messaging.incoming.movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.movies.auto.offset.reset=earliest package foo; import io.quarkus.runtime.Quarkus; import io.quarkus.runtime.annotations.QuarkusMain; import io.smallrye.mutiny.Multi; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.jboss.logging.Logger; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName; import javax.enterprise.context.ApplicationScoped; public class KafkaFailFast { static final Logger LOGGER = Logger.getLogger("Kafka-Fail-Fast"); @QuarkusMain static class Main { public static void main(String... args) { LOGGER.info("Starting Kafka..."); KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1")); kafka.start(); LOGGER.infof("Kafka started: %s", kafka.getBootstrapServers()); System.setProperty("kafka.bootstrap.servers", kafka.getBootstrapServers()); Quarkus.run(args); } } @ApplicationScoped public static class MovieProcessor { @Incoming("movies") public void consume(String movie) { LOGGER.infof("Receiving movie %s", movie); if (movie.contains("'")) { throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie); } if (movie.contains(",")) { throw new IllegalArgumentException("I don't like movie with , in their title: " + movie); } } } @ApplicationScoped public static class MovieWriter { @Outgoing("movies-out") public Multi<String> generate() { return Multi.createFrom().items( "The Shawshank Redemption", "The Godfather", "The Godfather: Part II", "The Dark Knight", "12 Angry Men", "Schindler's List", "The Lord of the Rings: The Return of the King", "Pulp Fiction", "The Good, the Bad and the Ugly", "The Lord of the Rings: The Fellowship of the Ring" ); } } }