I seem to be losing my 'in' body after reading it once. Note that I am using Camel's stream caching, and that the input is a json file from the http component. I have a processor with the following code.
log.debug("Body Type: " + exchange.getIn().getBody().getClass().getCanonicalName());
log.debug("In msg1:" + exchange.getIn().getBody(String.class));
log.debug("In msg2:" + exchange.getIn().getBody(String.class));
What I'd expect to see here is that msg1 and msg2 are the same output, However msg2 returns a blank string (not null). Here are the logs at TRACE level.
1- DEBUG com.mycompany.MyProcessor : Body Type: org.apache.camel.converter.stream.InputStreamCache
2- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Converting org.apache.camel.converter.stream.InputStreamCache -> java.lang.String with value: org.apache.camel.converter.stream.InputStreamCache@780a5cef
3- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Using converter: StaticMethodTypeConverter: public static java.lang.String org.apache.camel.converter.IOConverter.toString(java.io.InputStream,org.apache.camel.Exchange) throws java.io.IOException to convert [class org.apache.camel.converter.stream.InputStreamCache=>class java.lang.String]
4- DEBUG com.mycompany.MyProcessor : In msg1:{myJson}
5- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Converting org.apache.camel.converter.stream.InputStreamCache -> java.lang.String with value: org.apache.camel.converter.stream.InputStreamCache@780a5cef
6- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Using converter: StaticMethodTypeConverter: public static java.lang.String org.apache.camel.converter.IOConverter.toString(java.io.InputStream,org.apache.camel.Exchange) throws java.io.IOException to convert [class org.apache.camel.converter.stream.InputStreamCache=>class java.lang.String]
7- DEBUG com.mycompany.MyProcessor : In msg2:
Things to note from the logs:
So where did msg2 go?
EDIT
Some things to mention in addition to Peter's answer below:
Camel's MessageHelper static class has two useful functions:
resetStreamCache
extractBodyAsString
Both of which will help for this situation
Working example for me:
from("direct:secondRoute")
.to("callOtherService")
.log("RESPONSE: ${body}")
.process(exchange -> {
InputStream in = exchange.getIn().getBody(InputStream.class);
in.reset();
});
from("direct:firstRoute")
.to(direct:secondRoute)
.convertBodyTo(String.class);
Using stream cache allows you to read streams more than once in different processors but still only once in the same processor.
I tested with:
ModelCamelContext context = new DefaultCamelContext();
context.setStreamCaching(true); //!!
// ...
from("direct:start")
.to("http://ip.jsontest.com/?callback=showMyIP")
.process(new MyProcessor())
.process(new MyProcessor());
And:
public class MyProcessor implements Processor {
private static final Logger LOG = LoggerFactory.getLogger(HttpStreamCache.MyProcessor.class);
@Override
public void process(final Exchange exchange) throws Exception {
LOG.info("***** Body Type: " + exchange.getIn().getBody().getClass().getCanonicalName());
LOG.info("***** In msg1 : " + exchange.getIn().getBody(String.class));
LOG.info("***** In msg2 : " + exchange.getIn().getBody(String.class));
}
};
This prints:
INFO ***** Body Type: org.apache.camel.converter.stream.InputStreamCache
INFO ***** In msg1 : showMyIP({"ip": "00.000.000.00"});
INFO ***** In msg2 :
INFO ***** Body Type: org.apache.camel.converter.stream.InputStreamCache
INFO ***** In msg1 : showMyIP({"ip": "00.000.000.00"});
INFO ***** In msg2 :
I believe you problem is not the input stream is gone, but rather that it's reader position is at the last point of consumption, in this case, at the end of the stream.
you could use a stream "rewind" method that could bring your reader to the beginning of the stream - only if the stream supports it.
Here's a scala code that provides such a resetter:
object StreamResetter extends Processor {
import org.apache.camel.util.MessageHelper
import org.apache.camel.Exchange
override def process(exchange: Exchange): Unit = MessageHelper.resetStreamCache(exchange.getIn)
}
than, on your route, just before the second log call, you can use
process(StreamResetter)
Regards.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With