Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache camel - how to "wiretap" synchronously? Or just send a copy of an exchange?

I have an apache camel route which is processing a POJO on the exchange body.

Please look at the sequences of lines marked from 1 to 3.

    from("direct:foo")
        .to("direct:doSomething")         // 1 (POJO on the exchange body)
        .to("direct:storeInHazelcast")    // 2 (destroys my pojo! it gets -1)
        .to("direct:doSomethingElse")     // 3 (Where is my POJO??)
    ;

Now I need to use the put operation on the hazelcast component which unfortunately needs to set the body to the value -1.

    from("direct:storeInHazelcast")
            .setBody(constant(-1))
            .setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.PUT_OPERATION))
            .setHeader(HazelcastConstants.OBJECT_ID, constant(LAST_FLIGHT_UPDATE_SEQ))
            .to("hazelcast:map:MyNumber")
    ;

For the line marked 2, I would like to send a COPY of the exchange to the storeInHazelcast route.

Firstly, I tried .multicast(), but the exchange body was still screwed up (to -1).

        // shouldnt this copy the exchange?
        .multicast().to("direct:storeInHazelcast").end()

Then I tried .wireTap(), which works as a "fire and forget" (async) mode, but I actually need it to block, and wait for it to complete. Can you make wireTap block?

        // this works but I need it to be sync processing (not async)
        .wireTap("direct:storeInHazelcast").end()

So I'm looking for some tips here. As far as I can read, multicast() should have copied the exchange, but the setBody() in my storeInHazelcast route seens to screw up the original exchange.

Alternatively maybe there is some other way to do this.

Thanks in advance. Camel 2.10

like image 833
vikingsteve Avatar asked Jan 07 '14 10:01

vikingsteve


2 Answers

I think I have stumbled on to the answer, line 2 can use enrich() from the dsl like this:

    .enrich("direct:storeInHazelcast", new KeepOriginalAggregationStrategy())

where:

public class KeepOriginalAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        return oldExchange;
    }
}

Interestingly, I found an aggregation strategy named UseOriginalAggregationStrategy(), but I couldn't see how to specify the parameter named Exchange original from the DSL.

    .enrich("direct:storeInHazelcast",
        new UseOriginalAggregationStrategy(???, false))

In absence of some sort of getExchange() method in the dsl, I can't see how to use this aggregation strategy here (but if anyone can advise how, please do).

like image 158
vikingsteve Avatar answered Nov 03 '22 01:11

vikingsteve


You can do without writing your own aggregation strategy by using

.enrich("direct:storeInHazelcast", AggregationStrategies.useOriginal())
like image 28
Arek Bazylewicz Avatar answered Nov 02 '22 23:11

Arek Bazylewicz