Apache Camel - JDBC with Spring Transaction support

Apache Camel – JDBC with Spring Transaction support

Continuing my journey with Fuse ESB (ServiceMix) and Apache Camel I will present a small example on how to route your messages in a transacted way using Spring Transaction Manager.
For the purpose of this tutorial we are going to define a datasource to in memory Hsqldb instance and configure it with ‘org.springframework.jdbc.datasource.DataSourceTransactionManager’. Our camel example will be based on the tutorial/document from the FuseSource website itself which could be found at : http://fusesource.com/docs/router/2.5/transactions/index.html.
Our application will do the following:

  1. Create database table in the in-memory hsqldb database and populate it with same data – we are going to use JdbcTemplate for this purpose
  2. The route itself will pick up a messages from the specified directory, one by one, one of the messages will contain data causing an exception and rolling back our transaction and database commit.
  3. After each message is processed sucessfully the dump of our database will be printed to the console
  4. If the exception occurs and the transaction will be rolled back, the appropriate log would be displayed stating the current database content

The full source code for this example, packaged as a maven based project can be downloaded from the Github website.

1. We are going to use 2 spring beans for our example, one with injected datasource in a constructor which will populate our database data:

package tutorial.jdbc;

import javax.sql.DataSource;

import org.apache.log4j.Logger;
import org.springframework.jdbc.core.JdbcTemplate;

public class CreateTable {
    private static Logger log = Logger.getLogger(CreateTable.class);

    protected DataSource dataSource;
    protected JdbcTemplate jdbc;

    public DataSource getDataSource() {
        return dataSource;
    }

    public void setDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
    }
   
    public CreateTable(DataSource ds) {
        log.info("CreateTable constructor called");
        setDataSource(ds);
        setUpTable();
    }
   
    public void setUpTable() {
        log.info("About to set up table...");
        jdbc = new JdbcTemplate(dataSource);
        jdbc.execute("create table accounts (name varchar(50), amount int)");
        jdbc.update("insert into accounts (name,amount) values (?,?)",
                new Object[] {"Major Clanger", 2000}
        );
        jdbc.update("insert into accounts (name,amount) values (?,?)",
                new Object[] {"Tiny Clanger", 100}
        );
        log.info("Table created");
    }
}

another bean will be responsible for our route logic, performing database operations. Datasource for this bean is injected using a property (see the camel-context.xml file below for more details):

package tutorial.jdbc;

import java.util.List;

import javax.sql.DataSource;

import org.apache.camel.Exchange;
import org.apache.camel.language.XPath;
import org.apache.log4j.Logger;
import org.springframework.jdbc.core.JdbcTemplate;

public class AccountService {
    private static Logger log = Logger.getLogger(AccountService.class);
    private JdbcTemplate jdbc;

    public AccountService() {
    }

    public void setDataSource(DataSource ds) {
        jdbc = new JdbcTemplate(ds);
    }

    /**
     * Adds a specific amount of money to a named account
     *
     * @param name - account name
     * @param amount - amount to add
     */

    public void credit(@XPath("/transaction/transfer/receiver/text()") String name,
            @XPath("/transaction/transfer/amount/text()") String amount) {
        log.info("credit() called with args name = " + name + " and amount = " + amount);
        int origAmount = jdbc.queryForInt("select amount from accounts where name = ?", new Object[] { name });
        int newAmount = origAmount + Integer.parseInt(amount);
        jdbc.update("update accounts set amount = ? where name = ?", new Object[] { newAmount, name });
    }

    /**
     * Subtract a specific amount of money from a named account.
     *
     * @param name - account name
     * @param amount - amount to add
     */

    public void debit(@XPath("/transaction/transfer/sender/text()") String name, @XPath("/transaction/transfer/amount/text()") String amount) {
        log.info("debit() called with args name = " + name + " and amount = " + amount);
        int iamount = Integer.parseInt(amount);
        if (iamount > 100) {
            throw new IllegalArgumentException("Debit limit is 100");
        }
        int origAmount = jdbc.queryForInt("select amount from accounts where name = ?", new Object[] { name });
        int newAmount = origAmount - Integer.parseInt(amount);
        if (newAmount < 0) {
            throw new IllegalArgumentException("Not enough in account");
        }

        jdbc.update("update accounts set amount = ? where name = ?", new Object[] { newAmount, name });
    }
    public void dumpTable(Exchange ex) {
        log.info("dump() called");
        List<?> dump = jdbc.queryForList("select * from accounts");
        ex.getIn().setBody(dump.toString());
    }
}

Our datasource, transaction manager and injection points for our beans are defined in camel-context.xml file located in ‘resources/META-INF/spring’ directory:

<?xml version="1.0" encoding="UTF-8"?>
<!-- Configures the Camel Context-->
<beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="
      http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
      http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">


  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <package>tutorial</package>
  </camelContext>
 
  <!-- spring transaction manager -->
    <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"/>
    </bean>

    <!-- datasource to the database -->
    <bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
        <property name="driverClass" value="org.hsqldb.jdbcDriver"/>
        <property name="url" value="jdbc:hsqldb:mem:camel"/>
        <property name="username" value="sa"/>
        <property name="password" value=""/>
    </bean>
   
    <!--  Bean to initialize table in the DB -->
    <bean id="createTable" class="tutorial.jdbc.CreateTable">
        <constructor-arg ref="dataSource" />
    </bean>
   
    <!-- Bean for account service -->
    <bean id="accountService" class="tutorial.jdbc.AccountService">
        <property name="dataSource" ref="dataSource"/>
    </bean>
</beans>

2 and 3. When our configuration and the spring beans are ready we can start coding our route:

//our route definition
        //noop option - if true, the file is not moved or deleted in any way
        from("file:src/data?noop=true")
        //mark the route as transacted
        .transacted()
        //execute spring bean methods
        .beanRef("accountService","credit")
        .beanRef("accountService","debit")
        .beanRef("accountService","dumpTable")
        //log the result
        .to("log:ExampleRouter");

4. For simple error handling mechanism I have decided to use ‘onException’ which will be executed every time an exception will be thrown in our route and transaction will be rolled back

//handle exceptions and log them
        //handle exceptions and log them
        onException(IllegalArgumentException.class).
        maximumRedeliveries(0)
        .handled(true)
        .beanRef("accountService", "dumpTable")
        .to("file:target/messages?fileName=deadLetters.xml&fileExist=Append")
        .markRollbackOnly();

Remember to use SpringRouteBuilder for our transacted route instead of simple RouteBuilder class, below is the full source of our route configuration using Java DSL:

package tutorial;

import org.apache.camel.spring.SpringRouteBuilder;

/**
 * A Camel Router
 *
 * @version $
 */

public class MyRouteBuilder extends SpringRouteBuilder {


    public void configure() {
        //handle exceptions and log them
        onException(IllegalArgumentException.class).
        maximumRedeliveries(0)
        .handled(true)
        .beanRef("accountService", "dumpTable")
        .to("file:target/messages?fileName=deadLetters.xml&fileExist=Append")
        .markRollbackOnly();

        //our route definition
        //noop option - if true, the file is not moved or deleted in any way
        from("file:src/data?noop=true")
        //mark the route as transacted
        .transacted()
        //execute spring bean methods
        .beanRef("accountService","credit")
        .beanRef("accountService","debit")
        .beanRef("accountService","dumpTable")
        //log the result
        .to("log:ExampleRouter");
       
    }
}

To run an example, navigate into the project folder and execute the following command:

 mvn camel:run

Leave a Reply