Created
January 8, 2025 21:56
-
-
Save josergdev/7fe6d90ecd535c5901bc87d12885467b to your computer and use it in GitHub Desktop.
Revisions
-
josergdev created this gist
Jan 8, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,254 @@ package dev.joserg.outboxintegration; import org.postgresql.jdbc.PgConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.MessageChannels; import org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber; import org.springframework.integration.jdbc.channel.PostgresSubscribableChannel; import org.springframework.integration.jdbc.store.JdbcChannelMessageStore; import org.springframework.integration.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider; import org.springframework.integration.store.ChannelMessageStore; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.sql.DataSource; import java.io.Serializable; import java.time.*; import java.time.format.DateTimeFormatter; import java.util.UUID; import java.util.random.RandomGenerator; import static dev.joserg.outboxintegration.OutboxIntegrationApplication.SpringConfiguration.Channel.*; @SpringBootApplication public class OutboxIntegrationApplication { public static void main(String[] args) { SpringApplication.run(OutboxIntegrationApplication.class, args); } @RestController public static class GreetingController { private final Publisher publisher; public GreetingController(Publisher publisher) { this.publisher = publisher; } @GetMapping("/greet") public String greet() { this.publisher.publish(new GreetingMessage(ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_DATE_TIME))); return "Hello!"; } } @MessagingGateway public interface Publisher { @Gateway(requestChannel = "journalChannel") void publish(GreetingMessage greetingMessage); } public record GreetingMessage(String instant) implements Serializable { } @Component public static class KafkaPublisher { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPublisher.class); private final RandomGenerator randomGenerator = RandomGenerator.getDefault(); public void publish(Message<GreetingMessage> greetingMessage) { if (randomGenerator.nextBoolean()) { LOGGER.info("Sending to kafka: {}", greetingMessage); } else { throw new RuntimeException("Kafka server down"); } } } @Component public static class RabbitPublisher { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitPublisher.class); private final RandomGenerator randomGenerator = RandomGenerator.getDefault(); public void publish(Message<GreetingMessage> greetingMessage) { if (randomGenerator.nextBoolean()) { LOGGER.info("Sending to rabbit: {}", greetingMessage); } else { throw new RuntimeException("rabbit server down"); } } } @Component public static class JournalPublisher { private static final Logger LOGGER = LoggerFactory.getLogger(JournalPublisher.class); private final RandomGenerator randomGenerator = RandomGenerator.getDefault(); public void publish(Message<GreetingMessage> greetingMessage) { if (randomGenerator.nextBoolean()) { LOGGER.info("Sending to journal: {}", greetingMessage); } else { throw new RuntimeException("journal server down"); } } } @Configuration public static class SpringConfiguration { public enum Channel { JOURNAL, KAFKA, RABBIT; public static UUID id(Channel channel) { return switch (channel) { case JOURNAL -> UUID.fromString("00000000-0000-0000-0000-000000000001"); case KAFKA -> UUID.fromString("00000000-0000-0000-0000-000000000002"); case RABBIT -> UUID.fromString("00000000-0000-0000-0000-000000000003"); }; } public UUID id() { return id(this); } } @Bean @ConfigurationProperties("spring.datasource") public DataSource dataSource() { return DataSourceBuilder.create().build(); } @Bean public DataSourceTransactionManager dataSourceTransactionManager(DataSource dataSource) { return new DataSourceTransactionManager(dataSource); } @Bean JdbcChannelMessageStore jdbcChannelMessageStore(DataSource dataSource) { JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource); jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider()); return jdbcChannelMessageStore; } @Bean public PostgresChannelMessageTableSubscriber messageStoreSubscriber(DataSource dataSource) { return new PostgresChannelMessageTableSubscriber(() -> dataSource.getConnection().unwrap(PgConnection.class)); } @Bean QueueChannel journalChannel(ChannelMessageStore channelMessageStore) { return MessageChannels.queue(channelMessageStore, JOURNAL.id()).getObject(); } @Bean public PostgresSubscribableChannel journalSubscription( PostgresChannelMessageTableSubscriber subscriber, JdbcChannelMessageStore messageStore, DataSourceTransactionManager dataSourceTransactionManager) { var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, JOURNAL.id(), subscriber); postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager); return postgresSubscribableChannel; } @Bean QueueChannel kafkaChannel(ChannelMessageStore channelMessageStore) { return MessageChannels.queue(channelMessageStore, KAFKA.id()).getObject(); } @Bean public PostgresSubscribableChannel kafkaSubscription( PostgresChannelMessageTableSubscriber subscriber, JdbcChannelMessageStore messageStore, DataSourceTransactionManager dataSourceTransactionManager) { var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, KAFKA.id(), subscriber); postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager); return postgresSubscribableChannel; } @Bean QueueChannel rabbitChannel(ChannelMessageStore channelMessageStore) { return MessageChannels.queue(channelMessageStore, RABBIT.id()).getObject(); } @Bean public PostgresSubscribableChannel rabbitSubscription( PostgresChannelMessageTableSubscriber subscriber, JdbcChannelMessageStore messageStore, DataSourceTransactionManager dataSourceTransactionManager) { var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, RABBIT.id(), subscriber); postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager); return postgresSubscribableChannel; } @Bean public IntegrationFlow journalFlow(@Qualifier("journalSubscription") PostgresSubscribableChannel journalSubscription, @Qualifier("kafkaChannel") QueueChannel kafkaChannel, @Qualifier("rabbitChannel") QueueChannel rabbitChannel, JournalPublisher journalPublisher) { return IntegrationFlow .from(journalSubscription) .<GreetingMessage>handle( (payload, headers) -> { var message = MessageBuilder.createMessage(payload, headers); journalPublisher.publish(message); return message; }, configurer -> configurer.requiresReply(true)) .routeToRecipients(configurer -> configurer .recipient(rabbitChannel) .recipient(kafkaChannel)) .get(); } @Bean public IntegrationFlow kafkaFlow(@Qualifier("kafkaSubscription") PostgresSubscribableChannel kafkaSubscription, KafkaPublisher kafkaPublisher) { return IntegrationFlow.from(kafkaSubscription) .handle(message -> kafkaPublisher.publish((Message<GreetingMessage>) message)) .get(); } @Bean public IntegrationFlow rabbitFlow(@Qualifier("rabbitSubscription") PostgresSubscribableChannel rabbitSubscription, RabbitPublisher rabbitPublisher) { return IntegrationFlow.from(rabbitSubscription) .handle(message -> rabbitPublisher.publish((Message<GreetingMessage>) message)) .get(); } } }