Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Camel - Stream cache not caching / can't convert?

I seem to be losing my 'in' body after reading it once. Note that I am using Camel's stream caching, and that the input is a json file from the http component. I have a processor with the following code.

    log.debug("Body Type: " + exchange.getIn().getBody().getClass().getCanonicalName());
    log.debug("In msg1:"  + exchange.getIn().getBody(String.class));
    log.debug("In msg2:"  + exchange.getIn().getBody(String.class));

What I'd expect to see here is that msg1 and msg2 are the same output, However msg2 returns a blank string (not null). Here are the logs at TRACE level.

1- DEBUG com.mycompany.MyProcessor : Body Type: org.apache.camel.converter.stream.InputStreamCache
2- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Converting org.apache.camel.converter.stream.InputStreamCache -> java.lang.String with value: org.apache.camel.converter.stream.InputStreamCache@780a5cef
3- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Using converter: StaticMethodTypeConverter: public static java.lang.String org.apache.camel.converter.IOConverter.toString(java.io.InputStream,org.apache.camel.Exchange) throws java.io.IOException to convert [class org.apache.camel.converter.stream.InputStreamCache=>class java.lang.String]
4- DEBUG com.mycompany.MyProcessor : In msg1:{myJson}
5- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Converting org.apache.camel.converter.stream.InputStreamCache -> java.lang.String with value: org.apache.camel.converter.stream.InputStreamCache@780a5cef
6- TRACE org.apache.camel.impl.converter.DefaultTypeConverter : Using converter: StaticMethodTypeConverter: public static java.lang.String org.apache.camel.converter.IOConverter.toString(java.io.InputStream,org.apache.camel.Exchange) throws java.io.IOException to convert [class org.apache.camel.converter.stream.InputStreamCache=>class java.lang.String]
7- DEBUG com.mycompany.MyProcessor : In msg2:

Things to note from the logs:

  • Line 1- The Body Type is correctly showing a cached input stream
  • Line 4- Converting to String does work to produce msg1, even though line 3, the conversion code, seems to fail with an IOException
  • Line 6- Also failing the conversion but it's important to note that the body is still a cached stream.
  • Line 7- My message is lost.

So where did msg2 go?

EDIT

Some things to mention in addition to Peter's answer below:

Camel's MessageHelper static class has two useful functions:

  • resetStreamCache
  • extractBodyAsString

Both of which will help for this situation

like image 549
Roy Truelove Avatar asked Dec 10 '14 14:12

Roy Truelove


3 Answers

Working example for me:

from("direct:secondRoute")
     .to("callOtherService")
     .log("RESPONSE: ${body}")  
     .process(exchange -> {
        InputStream in = exchange.getIn().getBody(InputStream.class);
        in.reset();
     });

from("direct:firstRoute")
     .to(direct:secondRoute)
     .convertBodyTo(String.class); 
like image 25
tatka Avatar answered Oct 01 '22 12:10

tatka


Using stream cache allows you to read streams more than once in different processors but still only once in the same processor.

I tested with:

ModelCamelContext context = new DefaultCamelContext();
context.setStreamCaching(true); //!!
// ...

from("direct:start")
    .to("http://ip.jsontest.com/?callback=showMyIP")
    .process(new MyProcessor())
    .process(new MyProcessor());

And:

public class MyProcessor implements Processor {
    private static final Logger LOG = LoggerFactory.getLogger(HttpStreamCache.MyProcessor.class);
    @Override
    public void process(final Exchange exchange) throws Exception {
        LOG.info("***** Body Type: " + exchange.getIn().getBody().getClass().getCanonicalName());
        LOG.info("***** In msg1  : " + exchange.getIn().getBody(String.class));
        LOG.info("***** In msg2  : " + exchange.getIn().getBody(String.class));
    }
};

This prints:

INFO  ***** Body Type: org.apache.camel.converter.stream.InputStreamCache
INFO  ***** In msg1  : showMyIP({"ip": "00.000.000.00"});

INFO  ***** In msg2  : 
INFO  ***** Body Type: org.apache.camel.converter.stream.InputStreamCache
INFO  ***** In msg1  : showMyIP({"ip": "00.000.000.00"});

INFO  ***** In msg2  :  
like image 90
Peter Keller Avatar answered Oct 01 '22 11:10

Peter Keller


I believe you problem is not the input stream is gone, but rather that it's reader position is at the last point of consumption, in this case, at the end of the stream.

you could use a stream "rewind" method that could bring your reader to the beginning of the stream - only if the stream supports it.

Here's a scala code that provides such a resetter:

object StreamResetter extends Processor {
  import org.apache.camel.util.MessageHelper
  import org.apache.camel.Exchange

  override def process(exchange: Exchange): Unit = MessageHelper.resetStreamCache(exchange.getIn)
}

than, on your route, just before the second log call, you can use

process(StreamResetter)

Regards.

like image 24
Marco Aurelio Avatar answered Oct 01 '22 13:10

Marco Aurelio