Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Camel aggregate route with exception

I want to use Apache Camel parallel multicast to 3 routes and aggregate (and wait for) 2 of them while leave 3rd go on its own (3rd one should not block first two). I also need to process these two in "all" cases meaning that if one of them fails (e.g. throws an exception during processing), it should be aggregated as well.

From what I understood from Camel documentation, that behavior should be "default" as long as you don't specify say stopOnException. But what happens is that exchange with exception never gets to my AggregationStrategy. What is kind of weird is that processors behind aggregate get executed even when there is completionSize(2).

So my question is: How is it possible that route after aggregation continues without processing exchange with the exception in AggregationStrategy? And how to solve my case correctly? Please note that doing .to("direct:sync") is not that case because this exception may be thrown from the route that is completely out of multicast-aggregate part of routes.

Here is the sample code (ExceptionThrowingProcessor throws MyException, PropertySetterProcessor is not important):

CamelContext context = new DefaultCamelContext();
RouteBuilder builder = new RouteBuilder() {
    @Override
    public void configure() throws Exception {
    }
};

builder.onException(MyException.class)
        .process(new PropertySetterProcessor("a", "onException"))
        .handled(true);

builder.from("direct:input")
        .process(new PropertySetterProcessor("a", "before-multicast"))
        .multicast()
        .parallelProcessing()
        .shareUnitOfWork()
        .to("direct:part1", "direct:part2", "direct:part3")

builder.from("direct:part1")
        .process(new PropertySetterProcessor("a", "part1"))
        .to("direct:sync");

builder.from("direct:part2")
        .process(new PropertySetterProcessor("a", "part2"))
        .process(new ExceptionThrowingProcessor("oops")) // throws MyException
        .to("direct:sync");

builder.from("direct:part3")
        .process(new PropertySetterProcessor("a", "part3"));
// don't want this to be aggregated within direct:sync

builder.from("direct:sync")
        .aggregate(new TestAggregationStrategy())
        // strategy.aggregate is called only once (from part1) but not from part2 :(
        .constant(true)
        .completionSize(2)
        .process(new PropertySetterProcessor("a", "after-aggregation"));

context.addRoutes(builder);
context.start();

ProducerTemplate template = context.createProducerTemplate();
template.send("direct:input", new DefaultExchange(context, ExchangePattern.InOut));
like image 956
zdenda.online Avatar asked Feb 07 '17 13:02

zdenda.online


1 Answers

Replace handled(true) with continue(true) on onException block. Refer this for more info http://camel.apache.org/exception-clause.html

like image 82
Naga Avatar answered Oct 06 '22 01:10

Naga