Apache Camel – JDBC with Spring Transaction support
Continuing my journey with Fuse ESB (ServiceMix) and Apache Camel I will present a small example on how to route your messages in a transacted way using Spring Transaction Manager.
For the purpose of this tutorial we are going to define a datasource to in memory Hsqldb instance and configure it with ‘org.springframework.jdbc.datasource.DataSourceTransactionManager’. Our camel example will be based on the tutorial/document from the FuseSource website itself which could be found at : http://fusesource.com/docs/router/2.5/transactions/index.html.
Our application will do the following:
- Create database table in the in-memory hsqldb database and populate it with same data – we are going to use JdbcTemplate for this purpose
- The route itself will pick up a messages from the specified directory, one by one, one of the messages will contain data causing an exception and rolling back our transaction and database commit.
- After each message is processed sucessfully the dump of our database will be printed to the console
- If the exception occurs and the transaction will be rolled back, the appropriate log would be displayed stating the current database content
The full source code for this example, packaged as a maven based project can be downloaded from the Github website.
1. We are going to use 2 spring beans for our example, one with injected datasource in a constructor which will populate our database data:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | package tutorial.jdbc; import javax.sql.DataSource; import org.apache.log4j.Logger; import org.springframework.jdbc.core.JdbcTemplate; public class CreateTable { private static Logger log = Logger.getLogger(CreateTable.class); protected DataSource dataSource; protected JdbcTemplate jdbc; public DataSource getDataSource() { return dataSource; } public void setDataSource(DataSource dataSource) { this.dataSource = dataSource; } public CreateTable(DataSource ds) { log.info("CreateTable constructor called"); setDataSource(ds); setUpTable(); } public void setUpTable() { log.info("About to set up table..."); jdbc = new JdbcTemplate(dataSource); jdbc.execute("create table accounts (name varchar(50), amount int)"); jdbc.update("insert into accounts (name,amount) values (?,?)", new Object[] {"Major Clanger", 2000} ); jdbc.update("insert into accounts (name,amount) values (?,?)", new Object[] {"Tiny Clanger", 100} ); log.info("Table created"); } } |
another bean will be responsible for our route logic, performing database operations. Datasource for this bean is injected using a property (see the camel-context.xml file below for more details):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | package tutorial.jdbc; import java.util.List; import javax.sql.DataSource; import org.apache.camel.Exchange; import org.apache.camel.language.XPath; import org.apache.log4j.Logger; import org.springframework.jdbc.core.JdbcTemplate; public class AccountService { private static Logger log = Logger.getLogger(AccountService.class); private JdbcTemplate jdbc; public AccountService() { } public void setDataSource(DataSource ds) { jdbc = new JdbcTemplate(ds); } /** * Adds a specific amount of money to a named account * * @param name - account name * @param amount - amount to add */ public void credit(@XPath("/transaction/transfer/receiver/text()") String name, @XPath("/transaction/transfer/amount/text()") String amount) { log.info("credit() called with args name = " + name + " and amount = " + amount); int origAmount = jdbc.queryForInt("select amount from accounts where name = ?", new Object[] { name }); int newAmount = origAmount + Integer.parseInt(amount); jdbc.update("update accounts set amount = ? where name = ?", new Object[] { newAmount, name }); } /** * Subtract a specific amount of money from a named account. * * @param name - account name * @param amount - amount to add */ public void debit(@XPath("/transaction/transfer/sender/text()") String name, @XPath("/transaction/transfer/amount/text()") String amount) { log.info("debit() called with args name = " + name + " and amount = " + amount); int iamount = Integer.parseInt(amount); if (iamount > 100) { throw new IllegalArgumentException("Debit limit is 100"); } int origAmount = jdbc.queryForInt("select amount from accounts where name = ?", new Object[] { name }); int newAmount = origAmount - Integer.parseInt(amount); if (newAmount < 0) { throw new IllegalArgumentException("Not enough in account"); } jdbc.update("update accounts set amount = ? where name = ?", new Object[] { newAmount, name }); } public void dumpTable(Exchange ex) { log.info("dump() called"); List<?> dump = jdbc.queryForList("select * from accounts"); ex.getIn().setBody(dump.toString()); } } |
Our datasource, transaction manager and injection points for our beans are defined in camel-context.xml file located in ‘resources/META-INF/spring’ directory:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | <?xml version="1.0" encoding="UTF-8"?> <!-- Configures the Camel Context--> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <camelContext xmlns="http://camel.apache.org/schema/spring"> <package>tutorial</package> </camelContext> <!-- spring transaction manager --> <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource"/> </bean> <!-- datasource to the database --> <bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource"> <property name="driverClass" value="org.hsqldb.jdbcDriver"/> <property name="url" value="jdbc:hsqldb:mem:camel"/> <property name="username" value="sa"/> <property name="password" value=""/> </bean> <!-- Bean to initialize table in the DB --> <bean id="createTable" class="tutorial.jdbc.CreateTable"> <constructor-arg ref="dataSource" /> </bean> <!-- Bean for account service --> <bean id="accountService" class="tutorial.jdbc.AccountService"> <property name="dataSource" ref="dataSource"/> </bean> </beans> |
2 and 3. When our configuration and the spring beans are ready we can start coding our route:
1 2 3 4 5 6 7 8 9 10 11 | //our route definition //noop option - if true, the file is not moved or deleted in any way from("file:src/data?noop=true") //mark the route as transacted .transacted() //execute spring bean methods .beanRef("accountService","credit") .beanRef("accountService","debit") .beanRef("accountService","dumpTable") //log the result .to("log:ExampleRouter"); |
4. For simple error handling mechanism I have decided to use ‘onException’ which will be executed every time an exception will be thrown in our route and transaction will be rolled back
1 2 3 4 5 6 7 8 | //handle exceptions and log them //handle exceptions and log them onException(IllegalArgumentException.class). maximumRedeliveries(0) .handled(true) .beanRef("accountService", "dumpTable") .to("file:target/messages?fileName=deadLetters.xml&fileExist=Append") .markRollbackOnly(); |
Remember to use SpringRouteBuilder for our transacted route instead of simple RouteBuilder class, below is the full source of our route configuration using Java DSL:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | package tutorial; import org.apache.camel.spring.SpringRouteBuilder; /** * A Camel Router * * @version $ */ public class MyRouteBuilder extends SpringRouteBuilder { public void configure() { //handle exceptions and log them onException(IllegalArgumentException.class). maximumRedeliveries(0) .handled(true) .beanRef("accountService", "dumpTable") .to("file:target/messages?fileName=deadLetters.xml&fileExist=Append") .markRollbackOnly(); //our route definition //noop option - if true, the file is not moved or deleted in any way from("file:src/data?noop=true") //mark the route as transacted .transacted() //execute spring bean methods .beanRef("accountService","credit") .beanRef("accountService","debit") .beanRef("accountService","dumpTable") //log the result .to("log:ExampleRouter"); } } |
To run an example, navigate into the project folder and execute the following command:
1 | mvn camel:run |
Related Posts
No Releated Posts