Skip to content

Instantly share code, notes, and snippets.

@josergdev
Created January 8, 2025 21:56
Show Gist options
  • Save josergdev/7fe6d90ecd535c5901bc87d12885467b to your computer and use it in GitHub Desktop.
Save josergdev/7fe6d90ecd535c5901bc87d12885467b to your computer and use it in GitHub Desktop.

Revisions

  1. josergdev created this gist Jan 8, 2025.
    254 changes: 254 additions & 0 deletions OutboxIntegrationApplication.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,254 @@
    package dev.joserg.outboxintegration;

    import org.postgresql.jdbc.PgConnection;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.boot.jdbc.DataSourceBuilder;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.Gateway;
    import org.springframework.integration.annotation.MessagingGateway;
    import org.springframework.integration.channel.QueueChannel;
    import org.springframework.integration.dsl.IntegrationFlow;
    import org.springframework.integration.dsl.MessageChannels;
    import org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber;
    import org.springframework.integration.jdbc.channel.PostgresSubscribableChannel;
    import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
    import org.springframework.integration.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider;
    import org.springframework.integration.store.ChannelMessageStore;
    import org.springframework.jdbc.datasource.DataSourceTransactionManager;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;

    import javax.sql.DataSource;
    import java.io.Serializable;
    import java.time.*;
    import java.time.format.DateTimeFormatter;
    import java.util.UUID;
    import java.util.random.RandomGenerator;

    import static dev.joserg.outboxintegration.OutboxIntegrationApplication.SpringConfiguration.Channel.*;

    @SpringBootApplication
    public class OutboxIntegrationApplication {

    public static void main(String[] args) {
    SpringApplication.run(OutboxIntegrationApplication.class, args);
    }


    @RestController
    public static class GreetingController {

    private final Publisher publisher;

    public GreetingController(Publisher publisher) {
    this.publisher = publisher;
    }

    @GetMapping("/greet")
    public String greet() {
    this.publisher.publish(new GreetingMessage(ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_DATE_TIME)));
    return "Hello!";
    }

    }

    @MessagingGateway
    public interface Publisher {

    @Gateway(requestChannel = "journalChannel")
    void publish(GreetingMessage greetingMessage);
    }

    public record GreetingMessage(String instant) implements Serializable {
    }

    @Component
    public static class KafkaPublisher {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPublisher.class);
    private final RandomGenerator randomGenerator = RandomGenerator.getDefault();


    public void publish(Message<GreetingMessage> greetingMessage) {
    if (randomGenerator.nextBoolean()) {
    LOGGER.info("Sending to kafka: {}", greetingMessage);
    } else {
    throw new RuntimeException("Kafka server down");
    }
    }
    }

    @Component
    public static class RabbitPublisher {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitPublisher.class);
    private final RandomGenerator randomGenerator = RandomGenerator.getDefault();


    public void publish(Message<GreetingMessage> greetingMessage) {
    if (randomGenerator.nextBoolean()) {
    LOGGER.info("Sending to rabbit: {}", greetingMessage);
    } else {
    throw new RuntimeException("rabbit server down");
    }
    }
    }

    @Component
    public static class JournalPublisher {

    private static final Logger LOGGER = LoggerFactory.getLogger(JournalPublisher.class);
    private final RandomGenerator randomGenerator = RandomGenerator.getDefault();


    public void publish(Message<GreetingMessage> greetingMessage) {
    if (randomGenerator.nextBoolean()) {
    LOGGER.info("Sending to journal: {}", greetingMessage);
    } else {
    throw new RuntimeException("journal server down");
    }
    }
    }

    @Configuration
    public static class SpringConfiguration {

    public enum Channel {
    JOURNAL, KAFKA, RABBIT;

    public static UUID id(Channel channel) {
    return switch (channel) {
    case JOURNAL -> UUID.fromString("00000000-0000-0000-0000-000000000001");
    case KAFKA -> UUID.fromString("00000000-0000-0000-0000-000000000002");
    case RABBIT -> UUID.fromString("00000000-0000-0000-0000-000000000003");
    };
    }

    public UUID id() {
    return id(this);
    }
    }




    @Bean
    @ConfigurationProperties("spring.datasource")
    public DataSource dataSource() {
    return DataSourceBuilder.create().build();
    }

    @Bean
    public DataSourceTransactionManager dataSourceTransactionManager(DataSource dataSource) {
    return new DataSourceTransactionManager(dataSource);
    }

    @Bean
    JdbcChannelMessageStore jdbcChannelMessageStore(DataSource dataSource) {
    JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
    jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
    return jdbcChannelMessageStore;
    }

    @Bean
    public PostgresChannelMessageTableSubscriber messageStoreSubscriber(DataSource dataSource) {
    return new PostgresChannelMessageTableSubscriber(() -> dataSource.getConnection().unwrap(PgConnection.class));
    }


    @Bean
    QueueChannel journalChannel(ChannelMessageStore channelMessageStore) {
    return MessageChannels.queue(channelMessageStore, JOURNAL.id()).getObject();
    }

    @Bean
    public PostgresSubscribableChannel journalSubscription(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore,
    DataSourceTransactionManager dataSourceTransactionManager) {
    var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, JOURNAL.id(), subscriber);
    postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager);
    return postgresSubscribableChannel;
    }


    @Bean
    QueueChannel kafkaChannel(ChannelMessageStore channelMessageStore) {
    return MessageChannels.queue(channelMessageStore, KAFKA.id()).getObject();
    }

    @Bean
    public PostgresSubscribableChannel kafkaSubscription(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore,
    DataSourceTransactionManager dataSourceTransactionManager) {
    var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, KAFKA.id(), subscriber);
    postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager);
    return postgresSubscribableChannel;
    }

    @Bean
    QueueChannel rabbitChannel(ChannelMessageStore channelMessageStore) {
    return MessageChannels.queue(channelMessageStore, RABBIT.id()).getObject();
    }

    @Bean
    public PostgresSubscribableChannel rabbitSubscription(
    PostgresChannelMessageTableSubscriber subscriber,
    JdbcChannelMessageStore messageStore,
    DataSourceTransactionManager dataSourceTransactionManager) {
    var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, RABBIT.id(), subscriber);
    postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager);
    return postgresSubscribableChannel;
    }


    @Bean
    public IntegrationFlow journalFlow(@Qualifier("journalSubscription") PostgresSubscribableChannel journalSubscription,
    @Qualifier("kafkaChannel") QueueChannel kafkaChannel,
    @Qualifier("rabbitChannel") QueueChannel rabbitChannel,
    JournalPublisher journalPublisher) {
    return IntegrationFlow
    .from(journalSubscription)
    .<GreetingMessage>handle(
    (payload, headers) -> {
    var message = MessageBuilder.createMessage(payload, headers);
    journalPublisher.publish(message);
    return message;
    },
    configurer -> configurer.requiresReply(true))
    .routeToRecipients(configurer -> configurer
    .recipient(rabbitChannel)
    .recipient(kafkaChannel))
    .get();
    }

    @Bean
    public IntegrationFlow kafkaFlow(@Qualifier("kafkaSubscription") PostgresSubscribableChannel kafkaSubscription,
    KafkaPublisher kafkaPublisher) {
    return IntegrationFlow.from(kafkaSubscription)
    .handle(message -> kafkaPublisher.publish((Message<GreetingMessage>) message))
    .get();
    }

    @Bean
    public IntegrationFlow rabbitFlow(@Qualifier("rabbitSubscription") PostgresSubscribableChannel rabbitSubscription,
    RabbitPublisher rabbitPublisher) {
    return IntegrationFlow.from(rabbitSubscription)
    .handle(message -> rabbitPublisher.publish((Message<GreetingMessage>) message))
    .get();
    }


    }

    }