package dev.joserg.outboxintegration; import org.postgresql.jdbc.PgConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.MessageChannels; import org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber; import org.springframework.integration.jdbc.channel.PostgresSubscribableChannel; import org.springframework.integration.jdbc.store.JdbcChannelMessageStore; import org.springframework.integration.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider; import org.springframework.integration.store.ChannelMessageStore; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.sql.DataSource; import java.io.Serializable; import java.time.*; import java.time.format.DateTimeFormatter; import java.util.UUID; import java.util.random.RandomGenerator; import static dev.joserg.outboxintegration.OutboxIntegrationApplication.SpringConfiguration.Channel.*; @SpringBootApplication public class OutboxIntegrationApplication { public static void main(String[] args) { SpringApplication.run(OutboxIntegrationApplication.class, args); } @RestController public static class GreetingController { private final Publisher publisher; public GreetingController(Publisher publisher) { this.publisher = publisher; } @GetMapping("/greet") public String greet() { this.publisher.publish(new GreetingMessage(ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_DATE_TIME))); return "Hello!"; } } @MessagingGateway public interface Publisher { @Gateway(requestChannel = "journalChannel") void publish(GreetingMessage greetingMessage); } public record GreetingMessage(String instant) implements Serializable { } @Component public static class KafkaPublisher { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPublisher.class); private final RandomGenerator randomGenerator = RandomGenerator.getDefault(); public void publish(Message greetingMessage) { if (randomGenerator.nextBoolean()) { LOGGER.info("Sending to kafka: {}", greetingMessage); } else { throw new RuntimeException("Kafka server down"); } } } @Component public static class RabbitPublisher { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitPublisher.class); private final RandomGenerator randomGenerator = RandomGenerator.getDefault(); public void publish(Message greetingMessage) { if (randomGenerator.nextBoolean()) { LOGGER.info("Sending to rabbit: {}", greetingMessage); } else { throw new RuntimeException("rabbit server down"); } } } @Component public static class JournalPublisher { private static final Logger LOGGER = LoggerFactory.getLogger(JournalPublisher.class); private final RandomGenerator randomGenerator = RandomGenerator.getDefault(); public void publish(Message greetingMessage) { if (randomGenerator.nextBoolean()) { LOGGER.info("Sending to journal: {}", greetingMessage); } else { throw new RuntimeException("journal server down"); } } } @Configuration public static class SpringConfiguration { public enum Channel { JOURNAL, KAFKA, RABBIT; public static UUID id(Channel channel) { return switch (channel) { case JOURNAL -> UUID.fromString("00000000-0000-0000-0000-000000000001"); case KAFKA -> UUID.fromString("00000000-0000-0000-0000-000000000002"); case RABBIT -> UUID.fromString("00000000-0000-0000-0000-000000000003"); }; } public UUID id() { return id(this); } } @Bean @ConfigurationProperties("spring.datasource") public DataSource dataSource() { return DataSourceBuilder.create().build(); } @Bean public DataSourceTransactionManager dataSourceTransactionManager(DataSource dataSource) { return new DataSourceTransactionManager(dataSource); } @Bean JdbcChannelMessageStore jdbcChannelMessageStore(DataSource dataSource) { JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource); jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider()); return jdbcChannelMessageStore; } @Bean public PostgresChannelMessageTableSubscriber messageStoreSubscriber(DataSource dataSource) { return new PostgresChannelMessageTableSubscriber(() -> dataSource.getConnection().unwrap(PgConnection.class)); } @Bean QueueChannel journalChannel(ChannelMessageStore channelMessageStore) { return MessageChannels.queue(channelMessageStore, JOURNAL.id()).getObject(); } @Bean public PostgresSubscribableChannel journalSubscription( PostgresChannelMessageTableSubscriber subscriber, JdbcChannelMessageStore messageStore, DataSourceTransactionManager dataSourceTransactionManager) { var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, JOURNAL.id(), subscriber); postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager); return postgresSubscribableChannel; } @Bean QueueChannel kafkaChannel(ChannelMessageStore channelMessageStore) { return MessageChannels.queue(channelMessageStore, KAFKA.id()).getObject(); } @Bean public PostgresSubscribableChannel kafkaSubscription( PostgresChannelMessageTableSubscriber subscriber, JdbcChannelMessageStore messageStore, DataSourceTransactionManager dataSourceTransactionManager) { var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, KAFKA.id(), subscriber); postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager); return postgresSubscribableChannel; } @Bean QueueChannel rabbitChannel(ChannelMessageStore channelMessageStore) { return MessageChannels.queue(channelMessageStore, RABBIT.id()).getObject(); } @Bean public PostgresSubscribableChannel rabbitSubscription( PostgresChannelMessageTableSubscriber subscriber, JdbcChannelMessageStore messageStore, DataSourceTransactionManager dataSourceTransactionManager) { var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, RABBIT.id(), subscriber); postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager); return postgresSubscribableChannel; } @Bean public IntegrationFlow journalFlow(@Qualifier("journalSubscription") PostgresSubscribableChannel journalSubscription, @Qualifier("kafkaChannel") QueueChannel kafkaChannel, @Qualifier("rabbitChannel") QueueChannel rabbitChannel, JournalPublisher journalPublisher) { return IntegrationFlow .from(journalSubscription) .handle( (payload, headers) -> { var message = MessageBuilder.createMessage(payload, headers); journalPublisher.publish(message); return message; }, configurer -> configurer.requiresReply(true)) .routeToRecipients(configurer -> configurer .recipient(rabbitChannel) .recipient(kafkaChannel)) .get(); } @Bean public IntegrationFlow kafkaFlow(@Qualifier("kafkaSubscription") PostgresSubscribableChannel kafkaSubscription, KafkaPublisher kafkaPublisher) { return IntegrationFlow.from(kafkaSubscription) .handle(message -> kafkaPublisher.publish((Message) message)) .get(); } @Bean public IntegrationFlow rabbitFlow(@Qualifier("rabbitSubscription") PostgresSubscribableChannel rabbitSubscription, RabbitPublisher rabbitPublisher) { return IntegrationFlow.from(rabbitSubscription) .handle(message -> rabbitPublisher.publish((Message) message)) .get(); } } }