Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Request Reply and Scatter Gather Using Apache Camel

Tags:

apache-camel

I am attempting to construct a route which will do the following:

  1. Consume a message from jms:sender-in. I am using a INOUTrequest reply pattern. The JMSReplyTo = sender-out
  2. The above message will be routed to multiple recipients like jms:consumer1-in, jms:consumer2-in and jms:consumer3-in. All are using a request reply pattern. The JMSReplyTo is specified per consumer ( in this case, the JMSReplyTo are in this order jms:consumer1-out, jms:consumer2-out, jms:consumer3-out
  3. I need to aggregate all the replies together and send the result back to jms:sender-out.

I constructed a route which will resemble this:

from("jms:sender-in")
    .to("jms:consumer1-in?exchangePattern=InOut&replyTo=queue:consumer1-out&preserveMessageQos=true")
    .to("jms:consumer2-in?exchangePattern=InOut&replyTo=queue:consumer2-out&preserveMessageQos=true")
    .to("jms:consumer3-in?exchangePattern=InOut&replyTo=queue:consumer3-out&preserveMessageQos=true");

I then send the replies back to some queue to gather and aggreagte:

from("jms:consumer1-out?preserveMessageQos=true").to("jms:gather");
from("jms:consumer1-out?preserveMessageQos=true").to("jms:gather");
from("jms:consumer1-out?preserveMessageQos=true").to("jms:gather");
from("jms:gather").aggregate(header("TransactionID"), new GatherResponses()).completionSize(3).to("jms:sender-out");

To emulate the behavior of my consumers, I added the following route:

from("jms:consumer1-in").setBody(body());
from("jms:consumer2-in").setBody(body());
from("jms:consumer3-in").setBody(body());

I am getting a couple off issues:

  1. I am getting a timeout error on the replies. If I comment out the gather part, then no issues. Why is there a timeout even though the replies are coming back to the queue and then forwarded to another queue.
  2. How can I store the original JMSReplyTo value so Camel is able to send the aggregated result back to the sender's reply queue.

I have a feeling that I am struggling with some basic concepts. Any help is appreciated. Thanks.

like image 496
techathon Avatar asked Dec 25 '22 05:12

techathon


1 Answers

A good question!

There are two things you need to consider

  1. Don't mix the exchange patterns, Request Reply (InOut) vs Event message (InOnly). (Unless you have a good reason).
  2. If you do a scatter-gather, you need to make the requests multicast, otherwise they will be pipelined which is not really scatter-gather.

I've made two examples which are similar to your case - one with Request Reply and one with (one way) Event messages.

Feel free to replace the activemq component with jms - it's the same thing in these examples.

Example one, using event messages - InOnly:

from("activemq:amq.in")
    .multicast()
        .to("activemq:amq.q1")
        .to("activemq:amq.q2")
        .to("activemq:amq.q3");

from("activemq:amq.q1").setBody(constant("q1")).to("activemq:amq.gather");
from("activemq:amq.q2").setBody(constant("q2")).to("activemq:amq.gather");
from("activemq:amq.q3").setBody(constant("q3")).to("activemq:amq.gather");

from("activemq:amq.gather")
    .aggregate(new ConcatAggregationStrategy())
        .header("breadcrumbId")
        .completionSize(3)
        .to("activemq:amq.out");

from("activemq:amq.out")
    .log("${body}"); // logs "q1q2q3"

Example two, using Request reply - note that the scattering route has to gather the responses as they come in. The result is the same as the first example, but with less routes and less configuration.

from("activemq:amq.in2")
    .multicast(new ConcatAggregationStrategy())
        .inOut("activemq:amq.q4")
        .inOut("activemq:amq.q5")
        .inOut("activemq:amq.q6")
    .end()
    .log("Received replies: ${body}"); // logs "q4q5q6"

from("activemq:amq.q4").setBody(constant("q4"));
from("activemq:amq.q5").setBody(constant("q5"));
from("activemq:amq.q6").setBody(constant("q6"));

As for your question two - of course, it's possible to pass around JMSReplyTo headers and force exchange patterns along the road - but you will create hard to debug code. Keep your exchange patterns simple and clean - it keep bugs away.

like image 63
Petter Nordlander Avatar answered Jan 05 '23 17:01

Petter Nordlander