Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implement Request-Reply pattern using ActiveMQ, Camel and Spring

I am trying to implement the following functionality:

Read CSV file line-by-line then for every line:

  1. Build request based on the values that the line contains
  2. Send the request to the message queue
  3. Other component needs to pick up the message, process the request and send the response to another message queue (known to producer, so the producer could pick up the response).

I believe that the request-reply pattern fits the bill. I installed ActiveMQ, downloaded camel and tried to use their jms project.

After configuring the component, the queue and testing connection (worked), I tried to figure out how actually to implement request-reply? I failed to find any good examples

I have a RouteBuilder

The RouteBuilder

public class MyRouteBuilder extends RouteBuilder {
    public static void main(String[] args) throws Exception {
        new Main().run(args);
    }

    public void configure() {
        from("file:src/data?noop=true")
        .to("activemq:RequestQ");

        from("activemq:RequestQ?exchangePattern=InOut&timeToLive=5000") 
        .inOut("activemq:RequestQ", "bean:myBean?method=someMethod"); 
    }
}

camel-context.xml

<beans xmlns="http://www.springframework.org/schema/beans"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="
         http://www.springframework.org/schema/beans 
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://camel.apache.org/schema/spring
         http://camel.apache.org/schema/spring/camel-spring.xsd">

    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <package>org.apache.camel.example.spring</package>
    </camelContext>

    <bean id="jmsConnectionFactory" 
        class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="pooledConnectionFactory" 
        class="org.apache.activemq.pool.PooledConnectionFactory" 
        init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>

    <bean id="jmsConfig" 
        class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="concurrentConsumers" value="10"/>
    </bean>

    <bean id="activemq" 
        class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>

    <bean id="myBean" class="org.apache.camel.example.spring.MyBean"/>
</beans>

Questions:

  1. How can I read a file line-by-line construct and post a message based on line content?
  2. How to configure the route and how to configure the message header in order to get response in temporary queue that will be deleted after the response was picked up?
  3. What quick-start guides for the above can you recommend?

EDIT

I got the code below working. Now lets say that in the Processor I create the response. How can I send it back? How can I consume the response?

public class MyRouteBuilder extends RouteBuilder {

    public static void main(String[] args) throws Exception {
        new Main().run(args);
    }

    public void configure() {
        from("file:/Users/aviad/ws/integ/src/data?fileName=lines.txt&noop=true&idempotent=true")
        .split()
        .tokenize("\\n")
        .inOut("activemq:req");

        from("activemq:req")
        .process(new Processor() {
            public void process(Exchange exchange) throws Exception {
                System.out.println(exchange.getIn().getBody(String.class));
                System.out.println("jmscorrelationid=" + exchange.getIn().getHeader("jmscorrelationid"));
                System.out.println("jmsdestination=" + exchange.getIn().getHeader("jmsdestination"));
            }
        });
    }
}
like image 446
aviad Avatar asked Apr 26 '13 18:04

aviad


People also ask

How should I implement request response with JMS?

The best way to implement request-response over JMS is to create a temporary queue and consumer per client on startup, set JMSReplyTo property on each message to the temporary queue and then use a correlationID on each message to correlate request messages to response messages.

What is exchange pattern in Apache Camel?

There are two Message Exchange Patterns you can use in messaging. From there Enterprise Integration Patterns they are: Event Message (or one-way) Request Reply.

What is camel SetHeader?

The SetHeader EIP is used for setting a message header.

What is camel activeMQ?

Apache ActiveMQ is released under the Apache 2.0 License; Apache Camel: A versatile open source integration framework. An open source Java framework that focuses on making integration easier and more accessible to developers.


1 Answers

I just had something similar around, so I alterd it and here it is. Note that the 2nd route does not need to be explicitly aware of a request/reply message, only the producer needs to know that. The 2nd route will reply if there is a reply to destination set (which is handled automagically by camel).

I don't know of any good example, but this doc page is really comprehensive with small examples.

 <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <route>
          <from uri="file://c:/apps/in"/>
          <split>
            <tokenize token="\n"/>
            <to uri="activemq:req" pattern="InOut"/>
            <to uri="stream:out"/><!-- print Hello to console -->
          </split>
        </route>
        <route>
          <from uri="activemq:req"/>
            <transform>
              <simple>Hello ${in.body}</simple>
            </transform>
        </route>
    </camelContext>
like image 98
Petter Nordlander Avatar answered Sep 20 '22 22:09

Petter Nordlander