Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to send large files to ActiveMQ using camel

I'm trying to send a X?-GB large file as a stream to an ActiveMQ queue for processing.

I know ActiveMQ supports streams, and so does camel-jms, but nothing I try to set on the queue seems to make any difference. The only thing that changes is turning off stream caching results is a "stream closed" exception instead.

I am open to using a processor or custom class (as long as resources get cleaned up), but this should be possible from the blueprint. How do I properly process a large file through camel-activemq without getting an OutOfMemoryError?

Using

  • servicemix-7.0.0
  • camel-2.16.4
  • activemq-5.14.3

Here is my camel blueprint

<?xml version="1.0" encoding="UTF-8"?>
<blueprint
xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0"

    <!-- just calls exchange.setBody(exchange.getBody(InputStream.class)) -->    
    <bean id="toStreamBody" class="my.test.toInputStream"/>

    <!-- define a bean of type StreamCachingStrategy which CamelContext will automaticly use -->
    <bean id="streamStrategy" class="org.apache.camel.impl.DefaultStreamCachingStrategy">
        <property name="spoolDirectory" value="${java.io.tmpdir}TestTemp/#uuid#/"/>
        <property name="spoolThreshold" value="131072"/>
        <property name="spoolUsedHeapMemoryThreshold" value="70"/>
        <property name="anySpoolRules" value="true"/>
    </bean>

    <!-- streamCaching="true" is "not a valid attribute" -->
    <camelContext streamCache="true" xmlns="http://camel.apache.org/schema/blueprint">

        <route id="file_route">
            <from uri="file://FileUploadBin?delete=false&amp;moveFailed=.error"/>
            <!-- just calls exchange.setBody(exchange.getBody(InputStream.class)) -->
            <bean ref="toStreamBody"/>
            <to uri="activemq:queue:TestQ"/>
        </route>

        <route id="myTestQ">
            <from uri="activemq:queue:TestQ?concurrentConsumers=1&amp;maxConcurrentConsumers=64&amp;maxMessagesPerTask=100&amp;asyncConsumer=true&amp;jmsMessageType=Stream&amp;mapJmsMessage=false"/>
            <bean ref="toStreamBody"/>
            <log message="FINISHED" loggingLevel="WARN"/>
        </route>

    </camelContext>
</blueprint>

Here is the error I keep getting

2017-10-17 08:46:53,859 | ERROR |  - RecipientList | DefaultErrorHandler              | 43 - org.apache.camel.camel-core - 2.16.4 | Failed delivery for (MessageId: ID-DESKTOP-H2O66PO-62468-1508242908251-4-4 on ExchangeId: ID-DESKTOP-H2O66PO-62468-1508242908251-4-5). Exhausted after delivery attempt: 1 caught: org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is instance of org.apache.camel.StreamCache] due java.lang.OutOfMemoryError: Java heap space

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[file_route        ] [file_route        ] [file://FileUploadBin?delete=false&moveFailed=.error                           ] [      3764]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
        Id                  ID-DESKTOP-H2O66PO-62468-1508242908251-4-5
        ExchangePattern     InOnly
        Headers             {breadcrumbId=ID-DESKTOP-H2O66PO-62468-1508242908251-4-4, fileName=Die.txt}
        BodyType            org.apache.camel.converter.stream.FileInputStreamCache
        Body                [Body is instance of org.apache.camel.StreamCache]
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is instance of org.apache.camel.StreamCache] due java.lang.OutOfMemoryError: Java heap space
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.createTypeConversionException(BaseTypeConverterRegistry.java:610)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:137)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.core.osgi.OsgiTypeConverter.convertTo(OsgiTypeConverter.java:108)[40:org.apache.camel.camel-blueprint:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:560)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:490)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:303)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:300)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:483)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:426)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:440)[46:org.apache.camel.camel-jms:2.16.4]
        at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[154:org.apache.servicemix.bundles.spring-jms:3.2.17.RELEASE_1]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:437)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:413)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:367)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:823)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:84)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:319)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:304)[43:org.apache.camel.camel-core:2.16.4]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)[:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121]
        at java.lang.Thread.run(Thread.java:745)[:1.8.0_121]
Caused by: org.apache.camel.RuntimeCamelException: java.lang.OutOfMemoryError: Java heap space
        at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1652)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1247)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.StaticMethodTypeConverter.convertTo(StaticMethodTypeConverter.java:59)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:293)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:120)[43:org.apache.camel.camel-core:2.16.4]
        ... 24 more
Caused by: java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)[:1.8.0_121]
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)[:1.8.0_121]
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)[:1.8.0_121]
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)[:1.8.0_121]
        at java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)[:1.8.0_121]
        at sun.nio.ch.FileChannelImpl.transferToArbitraryChannel(FileChannelImpl.java:567)[:1.8.0_121]
        at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:616)[:1.8.0_121]
        at org.apache.camel.converter.stream.FileInputStreamCache.writeTo(FileInputStreamCache.java:108)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.converter.stream.StreamCacheConverter.convertToByteArray(StreamCacheConverter.java:102)[43:org.apache.camel.camel-core:2.16.4]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_121]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)[:1.8.0_121]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)[:1.8.0_121]
        at java.lang.reflect.Method.invoke(Method.java:498)[:1.8.0_121]
        at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1243)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.StaticMethodTypeConverter.convertTo(StaticMethodTypeConverter.java:59)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:293)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:120)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.core.osgi.OsgiTypeConverter.convertTo(OsgiTypeConverter.java:108)
        at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:560)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:490)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:303)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:300)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:483)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:426)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:440)[46:org.apache.camel.camel-jms:2.16.4]
        at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[154:org.apache.servicemix.bundles.spring-jms:3.2.17.RELEASE_1]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:437)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:413)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:367)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[43:org.apache.camel.camel-core:2.16.4]
2017-10-17 08:46:57,641 | WARN  | ://FileUploadBin | GenericFileOnCompletion          | 43 - org.apache.camel.camel-core - 2.16.4 | Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@294a3d48 for file: GenericFile[Die.txt]
like image 865
Tezra Avatar asked Oct 17 '17 13:10

Tezra


People also ask

Can large files be sent over ActiveMQ?

The answer is yes.

What is ActiveMQ in Apache Camel?

The ActiveMQ component allows messages to be sent to a JMS Queue or Topic or messages to be consumed from a JMS Queue or Topic using Apache ActiveMQ.

Is Apache Camel A message queue?

Apache Camel - Message Queues.


1 Answers

This is not really supported on the classic ActiveMQ broker.

However the next generation ActiveMQ Artemis supports large messages, and we have just added support for this in camel-jms as well. I wrote a blog entry about this: http://www.davsclaus.com/2017/10/working-with-large-messages-using.html

And we also added support for the javax.jms.StreamMessage type in camel-jms. However this API is not as ideal for large messages, so it has limited usage. But nevertheless you can turn it on, on the component with the new option streamMessageTypeEnabled in Camel 2.21 onwards, and then if the message body is streaming type then Camel will send as StreamMessage instead of BytesMessage.

like image 194
Claus Ibsen Avatar answered Sep 28 '22 20:09

Claus Ibsen