Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save bharath-c/b8095f7d86267330fd648feee6f6ba56 to your computer and use it in GitHub Desktop.
Save bharath-c/b8095f7d86267330fd648feee6f6ba56 to your computer and use it in GitHub Desktop.

Revisions

  1. @jonmcewen jonmcewen created this gist Dec 16, 2015.
    104 changes: 104 additions & 0 deletions springboot-camel-jms-jdbc-tx.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,104 @@
    import javax.jms.ConnectionFactory;
    import javax.sql.DataSource;

    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.RedeliveryPolicy;
    import org.apache.activemq.camel.component.ActiveMQComponent;
    import org.apache.activemq.pool.PooledConnectionFactory;
    import org.apache.camel.builder.RouteBuilder;
    import org.apache.camel.spring.spi.SpringTransactionPolicy;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.data.transaction.ChainedTransactionManager;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.jdbc.datasource.DataSourceTransactionManager;
    import org.springframework.jms.connection.JmsTransactionManager;
    import org.springframework.transaction.support.TransactionTemplate;

    @SpringBootApplication
    public class MyApplication {

    public static void main(String[] args) {
    SpringApplication.run(MyApplication.class, args);
    }

    /**
    *
    * @return ActiveMQ connection factory with connection pooling and perpetual
    * redelivery
    */
    @Bean
    public ConnectionFactory activemqConnectionFactory() {
    RedeliveryPolicy rp = new RedeliveryPolicy();
    rp.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL("tcp://localhost:61616?jms.prefetchPolicy.all=1");
    connectionFactory.setUserName("admin");
    connectionFactory.setPassword("admin");
    connectionFactory.setRedeliveryPolicy(rp);

    PooledConnectionFactory pcf = new PooledConnectionFactory(connectionFactory);
    return pcf;
    }

    @Bean
    public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
    return new JmsTransactionManager(connectionFactory);
    }

    @Bean
    public DataSourceTransactionManager dataSourceTransactionManager(DataSource dataSource) {
    return new DataSourceTransactionManager(dataSource);
    }

    @Bean
    public ChainedTransactionManager transactionManager(JmsTransactionManager jmsTx, DataSourceTransactionManager dsTx) {
    return new ChainedTransactionManager(jmsTx, dsTx);
    }

    @Bean
    public TransactionTemplate transactionTemplate(ChainedTransactionManager tm) {
    return new TransactionTemplate(tm);
    }

    /*
    * Camel will look this up, and therefore get the right
    * PlatformTransactionManager, via the TransactionTemplate
    */
    @Bean
    public SpringTransactionPolicy PROPAGATION_REQUIRED(TransactionTemplate transactionManager) {
    return new SpringTransactionPolicy(transactionManager);
    }

    /**
    * @return activemq camel component
    */
    @Bean
    public ActiveMQComponent activemq(ConnectionFactory connectionFactory, JmsTransactionManager jmsTransactionManager) {
    ActiveMQComponent activeMqComponent = new ActiveMQComponent();
    activeMqComponent.setTestConnectionOnStartup(true);
    activeMqComponent.setTransacted(true);
    activeMqComponent.setConnectionFactory(connectionFactory);
    activeMqComponent.setTransactionManager(jmsTransactionManager);

    return activeMqComponent;
    }

    @Bean
    public RouteBuilder stockAdjusterBlacklistRoute(JdbcTemplate jdbcTemplate) {
    return new RouteBuilder() {

    @Override
    public void configure() throws Exception {
    // route that will roll back database update and return message
    // to queue on exception
    from("activemq://myQueue").routeId("myRoute").transacted().bean("databaseUpdater").bean("thingThatThrowsException");

    }

    };
    }

    }