//usr/bin/env jbang "$0" "$@" ; exit $? //DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1 //DEPS io.quarkus:quarkus-smallrye-health:1.9.0.CR1 //DEPS org.testcontainers:kafka:1.15.0-rc2 //JAVAC_OPTIONS -parameters //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager //JAVA_OPTIONS -Dmp.messaging.outgoing.movies-out.connector=smallrye-kafka -Dmp.messaging.outgoing.movies-out.topic=movies -Dmp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer //JAVA_OPTIONS -Dmp.messaging.incoming.movies.connector=smallrye-kafka -Dmp.messaging.incoming.movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.movies.auto.offset.reset=earliest //JAVA_OPTIONS -Dmp.messaging.incoming.movies.failure-strategy=ignore package foo; import io.quarkus.runtime.Quarkus; import io.quarkus.runtime.annotations.QuarkusMain; import io.smallrye.mutiny.Multi; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.jboss.logging.Logger; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName; import javax.enterprise.context.ApplicationScoped; public class KafkaIgnoreFailure { static final Logger LOGGER = Logger.getLogger("Kafka-Ignore"); @QuarkusMain static class Main { public static void main(String... args) { LOGGER.info("Starting Kafka..."); KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1")); kafka.start(); LOGGER.infof("Kafka started: %s", kafka.getBootstrapServers()); System.setProperty("kafka.bootstrap.servers", kafka.getBootstrapServers()); Quarkus.run(args); } } @ApplicationScoped public static class MovieProcessor { @Incoming("movies") public void consume(String movie) { LOGGER.infof("Receiving movie %s", movie); if (movie.contains("'")) { throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie); } if (movie.contains(",")) { throw new IllegalArgumentException("I don't like movie with , in their title: " + movie); } } } @ApplicationScoped public static class MovieWriter { @Outgoing("movies-out") public Multi generate() { return Multi.createFrom().items( "The Shawshank Redemption", "The Godfather", "The Godfather: Part II", "The Dark Knight", "12 Angry Men", "Schindler's List", "The Lord of the Rings: The Return of the King", "Pulp Fiction", "The Good, the Bad and the Ugly", "The Lord of the Rings: The Fellowship of the Ring" ); } } }