Skip to content

Instantly share code, notes, and snippets.

@cescoffier
Last active June 20, 2025 21:03
Show Gist options
  • Select an option

  • Save cescoffier/23ca7b2bcc8c49cee3db29b3f2b59e4a to your computer and use it in GitHub Desktop.

Select an option

Save cescoffier/23ca7b2bcc8c49cee3db29b3f2b59e4a to your computer and use it in GitHub Desktop.

Revisions

  1. cescoffier revised this gist Oct 14, 2020. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions KafkaDeadLetterTopic.java
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,6 @@
    //usr/bin/env jbang "$0" "$@" ; exit $?
    //DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1
    //DEPS io.quarkus:quarkus-smallrye-health:1.9.0.CR1
    //DEPS org.testcontainers:kafka:1.15.0-rc2
    //JAVAC_OPTIONS -parameters
    //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager
  2. cescoffier revised this gist Oct 14, 2020. 1 changed file with 94 additions and 0 deletions.
    94 changes: 94 additions & 0 deletions KafkaDeadLetterTopic.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,94 @@
    //usr/bin/env jbang "$0" "$@" ; exit $?
    //DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1
    //DEPS org.testcontainers:kafka:1.15.0-rc2
    //JAVAC_OPTIONS -parameters
    //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager
    //JAVA_OPTIONS -Dmp.messaging.outgoing.movies-out.connector=smallrye-kafka -Dmp.messaging.outgoing.movies-out.topic=movies -Dmp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
    //JAVA_OPTIONS -Dmp.messaging.incoming.movies.connector=smallrye-kafka -Dmp.messaging.incoming.movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.movies.auto.offset.reset=earliest
    //JAVA_OPTIONS -Dmp.messaging.incoming.movies.failure-strategy=dead-letter-queue
    //JAVA_OPTIONS -Dmp.messaging.incoming.dead-letter-topic-movies.connector=smallrye-kafka -Dmp.messaging.incoming.dead-letter-topic-movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.dead-letter-topic-movies.auto.offset.reset=earliest

    package foo;

    import io.quarkus.runtime.Quarkus;
    import io.quarkus.runtime.annotations.QuarkusMain;
    import io.smallrye.mutiny.Multi;
    import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata;
    import org.eclipse.microprofile.reactive.messaging.Incoming;
    import org.eclipse.microprofile.reactive.messaging.Message;
    import org.eclipse.microprofile.reactive.messaging.Outgoing;
    import org.jboss.logging.Logger;
    import org.testcontainers.containers.KafkaContainer;
    import org.testcontainers.utility.DockerImageName;

    import javax.enterprise.context.ApplicationScoped;
    import java.util.concurrent.CompletionStage;

    public class KafkaDeadLetterTopic {

    static final Logger LOGGER = Logger.getLogger("Kafka-Dead-Letter-Topic");

    @QuarkusMain
    static class Main {

    public static void main(String... args) {
    LOGGER.info("Starting Kafka...");
    KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1"));
    kafka.start();
    LOGGER.infof("Kafka started: %s", kafka.getBootstrapServers());
    System.setProperty("kafka.bootstrap.servers", kafka.getBootstrapServers());
    Quarkus.run(args);
    }
    }

    @ApplicationScoped
    public static class MovieProcessor {

    @Incoming("movies")
    public void consume(String movie) {
    LOGGER.infof("Receiving movie %s", movie);
    if (movie.contains("'")) {
    throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie);
    }
    if (movie.contains(",")) {
    throw new IllegalArgumentException("I don't like movie with , in their title: " + movie);
    }
    }

    }

    @ApplicationScoped
    public static class MovieWriter {

    @Outgoing("movies-out")
    public Multi<String> generate() {
    return Multi.createFrom().items(
    "The Shawshank Redemption",
    "The Godfather",
    "The Godfather: Part II",
    "The Dark Knight",
    "12 Angry Men",
    "Schindler's List",
    "The Lord of the Rings: The Return of the King",
    "Pulp Fiction",
    "The Good, the Bad and the Ugly",
    "The Lord of the Rings: The Fellowship of the Ring"
    );
    }

    }

    @ApplicationScoped
    public static class DeadLetterTopicReader {
    @SuppressWarnings("unchecked")
    @Incoming("dead-letter-topic-movies")
    public CompletionStage<Void> dead(Message<String> rejected) {
    IncomingKafkaRecordMetadata<String, String> metadata = rejected.getMetadata(IncomingKafkaRecordMetadata.class)
    .orElseThrow(() -> new IllegalArgumentException("Expected a message coming from Kafka"));
    String reason = new String(metadata.getHeaders().lastHeader("dead-letter-reason").value());
    LOGGER.infof("The message '%s' has been rejected and sent to the DLT. The reason is: '%s'.", rejected.getPayload(), reason);

    return rejected.ack();
    }
    }
    }
  3. cescoffier revised this gist Oct 14, 2020. 1 changed file with 77 additions and 0 deletions.
    77 changes: 77 additions & 0 deletions KafkaIgnoreFailure.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,77 @@
    //usr/bin/env jbang "$0" "$@" ; exit $?
    //DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1
    //DEPS io.quarkus:quarkus-smallrye-health:1.9.0.CR1
    //DEPS org.testcontainers:kafka:1.15.0-rc2
    //JAVAC_OPTIONS -parameters
    //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager
    //JAVA_OPTIONS -Dmp.messaging.outgoing.movies-out.connector=smallrye-kafka -Dmp.messaging.outgoing.movies-out.topic=movies -Dmp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
    //JAVA_OPTIONS -Dmp.messaging.incoming.movies.connector=smallrye-kafka -Dmp.messaging.incoming.movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.movies.auto.offset.reset=earliest
    //JAVA_OPTIONS -Dmp.messaging.incoming.movies.failure-strategy=ignore

    package foo;

    import io.quarkus.runtime.Quarkus;
    import io.quarkus.runtime.annotations.QuarkusMain;
    import io.smallrye.mutiny.Multi;
    import org.eclipse.microprofile.reactive.messaging.Incoming;
    import org.eclipse.microprofile.reactive.messaging.Outgoing;
    import org.jboss.logging.Logger;
    import org.testcontainers.containers.KafkaContainer;
    import org.testcontainers.utility.DockerImageName;

    import javax.enterprise.context.ApplicationScoped;

    public class KafkaIgnoreFailure {

    static final Logger LOGGER = Logger.getLogger("Kafka-Ignore");

    @QuarkusMain
    static class Main {

    public static void main(String... args) {
    LOGGER.info("Starting Kafka...");
    KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1"));
    kafka.start();
    LOGGER.infof("Kafka started: %s", kafka.getBootstrapServers());
    System.setProperty("kafka.bootstrap.servers", kafka.getBootstrapServers());
    Quarkus.run(args);
    }
    }

    @ApplicationScoped
    public static class MovieProcessor {

    @Incoming("movies")
    public void consume(String movie) {
    LOGGER.infof("Receiving movie %s", movie);
    if (movie.contains("'")) {
    throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie);
    }
    if (movie.contains(",")) {
    throw new IllegalArgumentException("I don't like movie with , in their title: " + movie);
    }
    }

    }

    @ApplicationScoped
    public static class MovieWriter {

    @Outgoing("movies-out")
    public Multi<String> generate() {
    return Multi.createFrom().items(
    "The Shawshank Redemption",
    "The Godfather",
    "The Godfather: Part II",
    "The Dark Knight",
    "12 Angry Men",
    "Schindler's List",
    "The Lord of the Rings: The Return of the King",
    "Pulp Fiction",
    "The Good, the Bad and the Ugly",
    "The Lord of the Rings: The Fellowship of the Ring"
    );
    }

    }
    }
  4. cescoffier created this gist Oct 14, 2020.
    76 changes: 76 additions & 0 deletions KafkaFailFast.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,76 @@
    //usr/bin/env jbang "$0" "$@" ; exit $?
    //DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1
    //DEPS io.quarkus:quarkus-smallrye-health:1.9.0.CR1
    //DEPS org.testcontainers:kafka:1.15.0-rc2
    //JAVAC_OPTIONS -parameters
    //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager
    //JAVA_OPTIONS -Dmp.messaging.outgoing.movies-out.connector=smallrye-kafka -Dmp.messaging.outgoing.movies-out.topic=movies -Dmp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
    //JAVA_OPTIONS -Dmp.messaging.incoming.movies.connector=smallrye-kafka -Dmp.messaging.incoming.movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.movies.auto.offset.reset=earliest

    package foo;

    import io.quarkus.runtime.Quarkus;
    import io.quarkus.runtime.annotations.QuarkusMain;
    import io.smallrye.mutiny.Multi;
    import org.eclipse.microprofile.reactive.messaging.Incoming;
    import org.eclipse.microprofile.reactive.messaging.Outgoing;
    import org.jboss.logging.Logger;
    import org.testcontainers.containers.KafkaContainer;
    import org.testcontainers.utility.DockerImageName;

    import javax.enterprise.context.ApplicationScoped;

    public class KafkaFailFast {

    static final Logger LOGGER = Logger.getLogger("Kafka-Fail-Fast");

    @QuarkusMain
    static class Main {

    public static void main(String... args) {
    LOGGER.info("Starting Kafka...");
    KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1"));
    kafka.start();
    LOGGER.infof("Kafka started: %s", kafka.getBootstrapServers());
    System.setProperty("kafka.bootstrap.servers", kafka.getBootstrapServers());
    Quarkus.run(args);
    }
    }

    @ApplicationScoped
    public static class MovieProcessor {

    @Incoming("movies")
    public void consume(String movie) {
    LOGGER.infof("Receiving movie %s", movie);
    if (movie.contains("'")) {
    throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie);
    }
    if (movie.contains(",")) {
    throw new IllegalArgumentException("I don't like movie with , in their title: " + movie);
    }
    }

    }

    @ApplicationScoped
    public static class MovieWriter {

    @Outgoing("movies-out")
    public Multi<String> generate() {
    return Multi.createFrom().items(
    "The Shawshank Redemption",
    "The Godfather",
    "The Godfather: Part II",
    "The Dark Knight",
    "12 Angry Men",
    "Schindler's List",
    "The Lord of the Rings: The Return of the King",
    "Pulp Fiction",
    "The Good, the Bad and the Ugly",
    "The Lord of the Rings: The Fellowship of the Ring"
    );
    }

    }
    }