I want to use Apache Camel parallel multicast to 3 routes and aggregate (and wait for) 2 of them while leave 3rd go on its own (3rd one should not block first two). I also need to process these two in "all" cases meaning that if one of them fails (e.g. throws an exception during processing), it should be aggregated as well.
From what I understood from Camel documentation, that behavior should be "default" as long as you don't specify say stopOnException. But what happens is that exchange with exception never gets to my AggregationStrategy. What is kind of weird is that processors behind aggregate get executed even when there is completionSize(2).
So my question is: How is it possible that route after aggregation continues without processing exchange with the exception in AggregationStrategy? And how to solve my case correctly? Please note that doing .to("direct:sync") is not that case because this exception may be thrown from the route that is completely out of multicast-aggregate part of routes.
Here is the sample code (ExceptionThrowingProcessor throws MyException, PropertySetterProcessor is not important):
CamelContext context = new DefaultCamelContext();
RouteBuilder builder = new RouteBuilder() {
@Override
public void configure() throws Exception {
}
};
builder.onException(MyException.class)
.process(new PropertySetterProcessor("a", "onException"))
.handled(true);
builder.from("direct:input")
.process(new PropertySetterProcessor("a", "before-multicast"))
.multicast()
.parallelProcessing()
.shareUnitOfWork()
.to("direct:part1", "direct:part2", "direct:part3")
builder.from("direct:part1")
.process(new PropertySetterProcessor("a", "part1"))
.to("direct:sync");
builder.from("direct:part2")
.process(new PropertySetterProcessor("a", "part2"))
.process(new ExceptionThrowingProcessor("oops")) // throws MyException
.to("direct:sync");
builder.from("direct:part3")
.process(new PropertySetterProcessor("a", "part3"));
// don't want this to be aggregated within direct:sync
builder.from("direct:sync")
.aggregate(new TestAggregationStrategy())
// strategy.aggregate is called only once (from part1) but not from part2 :(
.constant(true)
.completionSize(2)
.process(new PropertySetterProcessor("a", "after-aggregation"));
context.addRoutes(builder);
context.start();
ProducerTemplate template = context.createProducerTemplate();
template.send("direct:input", new DefaultExchange(context, ExchangePattern.InOut));
Replace handled(true) with continue(true) on onException block. Refer this for more info http://camel.apache.org/exception-clause.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With