Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rollback message in apache camel using spring framework

Working flow of my application: I have set up activemq and have written spring framework to listen to activemq's queue. whenever there is message in queue, the listener will get message then dequeue the message and do my business logic.

Now am in testcase, If there is any runtime error in my business logic the message should be rollback to the queue. so that the consumer can consume the message again and do my business logic again.

How can I achieve this with spring-camel?

The code I have written for ActiveMqConsumer

public class ActiveMqConsumer {
public static void main(String[] args){
    try {
        PropertyConfigurator.configure("C:/Users/awsdemo/src/main/resources/log4j.properties");
        ApplicationContext springcontext = new FileSystemXmlApplicationContext("C:/Users/awsdemo/src/main/resources/activecamel.xml");
        CamelContext context = springcontext.getBean("activeContext", CamelContext.class);
        //context.addComponent("activemq", activeMQComponent("tcp://localhost:61616?broker.persistent=false"));
        context.start();
        //Thread.sleep(1000);
        //context.stop();

    } catch ( Exception e ) {
        System.out.println(e);
    }

}
}

The code for ActiveMQRouterBuilder

public class ActiveMQRouterBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
    String activeMqURI = "activemq:queue:ThermalMap";
    System.out.print(activeMqURI);
    from( activeMqURI).to("bean:activemqProcessor?method=processMessage");

}
}

The code for ActiveMQProcessor

public class ActiveMQProcessor{
public void processMessage(Exchange exchange) throws Exception{
    System.out.println("\ninside processMessage :Consumer1");
    //System.out.println(exchange.getIn().getBody());
    Object object = exchange.getIn().getBody();
    FunctionNames functionNamesObject=new FunctionNames();
    //Call Intergration function to execute .exe file
    try {
                 /* my business logic*/
    } catch (IOException e) {
                     /* message should rollback here to activemq*/

        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (InterruptedException e) {
                     /* or message should rollback here to activemq*/
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    System.out.println("ActiveMQProcessor: finished");
}

}

The above three files are combine to act as consumer. These three files are configured in activecamel.xml file. The activecamel.xml contains following code

<camelContext id="activeContext" xmlns="http://camel.apache.org/schema/spring">
    <routeBuilder ref="activeMQRouter" />
</camelContext>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1" />
</bean>

<bean id="activeMQRouter" class="main.java.com.aranin.activemq.ActiveMQRouterBuilder"/>

<bean id="activemqProcessor" class="main.java.com.aranin.activemq.ActiveMQProcessor"/>

In ActiveMQProcessor I have written my business logic, If there is any error it will throw error to catch block. In catch block I should write the code to rollback message. What code should come there to rollback message?

like image 629
niren Avatar asked May 09 '26 19:05

niren


1 Answers

If you use transacted() in the route, then you should just throw an exception from the ActiveMQProcessor and Camel will automatic rollback the TX.

  from( activeMqURI)
    .transacted()      
    .to("bean:activemqProcessor?method=processMessage");

You also need to configure the ActiveMQ as transacted

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1" />
    <property name="transacted" value="true"/>
</bean>

And as well setup a JMS transaction manager. See more details at: http://camel.apache.org/transactional-client.html

Though the latter, Camel ought to setup one by default, if you have transacted=true. But its still best to define a transaction manager in the xml file, and refer to that in the activemq configuration. All details from that link above.

And if you have a copy of Camel in Action book, then read chapter 9.

like image 115
Claus Ibsen Avatar answered May 12 '26 10:05

Claus Ibsen



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!