import javax.jms.ConnectionFactory; import javax.sql.DataSource; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.camel.component.ActiveMQComponent; import org.apache.activemq.pool.PooledConnectionFactory; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.spring.spi.SpringTransactionPolicy; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.data.transaction.ChainedTransactionManager; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.jms.connection.JmsTransactionManager; import org.springframework.transaction.support.TransactionTemplate; @SpringBootApplication public class MyApplication { public static void main(String[] args) { SpringApplication.run(MyApplication.class, args); } /** * * @return ActiveMQ connection factory with connection pooling and perpetual * redelivery */ @Bean public ConnectionFactory activemqConnectionFactory() { RedeliveryPolicy rp = new RedeliveryPolicy(); rp.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL("tcp://localhost:61616?jms.prefetchPolicy.all=1"); connectionFactory.setUserName("admin"); connectionFactory.setPassword("admin"); connectionFactory.setRedeliveryPolicy(rp); PooledConnectionFactory pcf = new PooledConnectionFactory(connectionFactory); return pcf; } @Bean public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) { return new JmsTransactionManager(connectionFactory); } @Bean public DataSourceTransactionManager dataSourceTransactionManager(DataSource dataSource) { return new DataSourceTransactionManager(dataSource); } @Bean public ChainedTransactionManager transactionManager(JmsTransactionManager jmsTx, DataSourceTransactionManager dsTx) { return new ChainedTransactionManager(jmsTx, dsTx); } @Bean public TransactionTemplate transactionTemplate(ChainedTransactionManager tm) { return new TransactionTemplate(tm); } /* * Camel will look this up, and therefore get the right * PlatformTransactionManager, via the TransactionTemplate */ @Bean public SpringTransactionPolicy PROPAGATION_REQUIRED(TransactionTemplate transactionManager) { return new SpringTransactionPolicy(transactionManager); } /** * @return activemq camel component */ @Bean public ActiveMQComponent activemq(ConnectionFactory connectionFactory, JmsTransactionManager jmsTransactionManager) { ActiveMQComponent activeMqComponent = new ActiveMQComponent(); activeMqComponent.setTestConnectionOnStartup(true); activeMqComponent.setTransacted(true); activeMqComponent.setConnectionFactory(connectionFactory); activeMqComponent.setTransactionManager(jmsTransactionManager); return activeMqComponent; } @Bean public RouteBuilder stockAdjusterBlacklistRoute(JdbcTemplate jdbcTemplate) { return new RouteBuilder() { @Override public void configure() throws Exception { // route that will roll back database update and return message // to queue on exception from("activemq://myQueue").routeId("myRoute").transacted().bean("databaseUpdater").bean("thingThatThrowsException"); } }; } }