I'm trying to send a X?-GB large file as a stream to an ActiveMQ queue for processing.
I know ActiveMQ supports streams, and so does camel-jms, but nothing I try to set on the queue seems to make any difference. The only thing that changes is turning off stream caching results is a "stream closed" exception instead.
I am open to using a processor or custom class (as long as resources get cleaned up), but this should be possible from the blueprint. How do I properly process a large file through camel-activemq without getting an OutOfMemoryError?
Using
Here is my camel blueprint
<?xml version="1.0" encoding="UTF-8"?>
<blueprint
xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0"
<!-- just calls exchange.setBody(exchange.getBody(InputStream.class)) -->
<bean id="toStreamBody" class="my.test.toInputStream"/>
<!-- define a bean of type StreamCachingStrategy which CamelContext will automaticly use -->
<bean id="streamStrategy" class="org.apache.camel.impl.DefaultStreamCachingStrategy">
<property name="spoolDirectory" value="${java.io.tmpdir}TestTemp/#uuid#/"/>
<property name="spoolThreshold" value="131072"/>
<property name="spoolUsedHeapMemoryThreshold" value="70"/>
<property name="anySpoolRules" value="true"/>
</bean>
<!-- streamCaching="true" is "not a valid attribute" -->
<camelContext streamCache="true" xmlns="http://camel.apache.org/schema/blueprint">
<route id="file_route">
<from uri="file://FileUploadBin?delete=false&moveFailed=.error"/>
<!-- just calls exchange.setBody(exchange.getBody(InputStream.class)) -->
<bean ref="toStreamBody"/>
<to uri="activemq:queue:TestQ"/>
</route>
<route id="myTestQ">
<from uri="activemq:queue:TestQ?concurrentConsumers=1&maxConcurrentConsumers=64&maxMessagesPerTask=100&asyncConsumer=true&jmsMessageType=Stream&mapJmsMessage=false"/>
<bean ref="toStreamBody"/>
<log message="FINISHED" loggingLevel="WARN"/>
</route>
</camelContext>
</blueprint>
Here is the error I keep getting
2017-10-17 08:46:53,859 | ERROR | - RecipientList | DefaultErrorHandler | 43 - org.apache.camel.camel-core - 2.16.4 | Failed delivery for (MessageId: ID-DESKTOP-H2O66PO-62468-1508242908251-4-4 on ExchangeId: ID-DESKTOP-H2O66PO-62468-1508242908251-4-5). Exhausted after delivery attempt: 1 caught: org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is instance of org.apache.camel.StreamCache] due java.lang.OutOfMemoryError: Java heap space
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[file_route ] [file_route ] [file://FileUploadBin?delete=false&moveFailed=.error ] [ 3764]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-DESKTOP-H2O66PO-62468-1508242908251-4-5
ExchangePattern InOnly
Headers {breadcrumbId=ID-DESKTOP-H2O66PO-62468-1508242908251-4-4, fileName=Die.txt}
BodyType org.apache.camel.converter.stream.FileInputStreamCache
Body [Body is instance of org.apache.camel.StreamCache]
]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is instance of org.apache.camel.StreamCache] due java.lang.OutOfMemoryError: Java heap space
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.createTypeConversionException(BaseTypeConverterRegistry.java:610)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:137)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.core.osgi.OsgiTypeConverter.convertTo(OsgiTypeConverter.java:108)[40:org.apache.camel.camel-blueprint:2.16.4]
at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:560)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:490)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:303)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:300)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:483)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:426)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:440)[46:org.apache.camel.camel-jms:2.16.4]
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[154:org.apache.servicemix.bundles.spring-jms:3.2.17.RELEASE_1]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:437)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:413)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:367)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:823)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:84)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:319)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:304)[43:org.apache.camel.camel-core:2.16.4]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)[:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121]
at java.lang.Thread.run(Thread.java:745)[:1.8.0_121]
Caused by: org.apache.camel.RuntimeCamelException: java.lang.OutOfMemoryError: Java heap space
at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1652)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1247)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.impl.converter.StaticMethodTypeConverter.convertTo(StaticMethodTypeConverter.java:59)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:293)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:120)[43:org.apache.camel.camel-core:2.16.4]
... 24 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)[:1.8.0_121]
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)[:1.8.0_121]
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)[:1.8.0_121]
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)[:1.8.0_121]
at java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)[:1.8.0_121]
at sun.nio.ch.FileChannelImpl.transferToArbitraryChannel(FileChannelImpl.java:567)[:1.8.0_121]
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:616)[:1.8.0_121]
at org.apache.camel.converter.stream.FileInputStreamCache.writeTo(FileInputStreamCache.java:108)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.converter.stream.StreamCacheConverter.convertToByteArray(StreamCacheConverter.java:102)[43:org.apache.camel.camel-core:2.16.4]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_121]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)[:1.8.0_121]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)[:1.8.0_121]
at java.lang.reflect.Method.invoke(Method.java:498)[:1.8.0_121]
at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1243)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.impl.converter.StaticMethodTypeConverter.convertTo(StaticMethodTypeConverter.java:59)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:293)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:120)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.core.osgi.OsgiTypeConverter.convertTo(OsgiTypeConverter.java:108)
at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:560)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:490)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:303)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:300)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:483)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:426)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:440)[46:org.apache.camel.camel-jms:2.16.4]
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[154:org.apache.servicemix.bundles.spring-jms:3.2.17.RELEASE_1]
at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:437)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:413)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:367)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)[46:org.apache.camel.camel-jms:2.16.4]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[43:org.apache.camel.camel-core:2.16.4]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[43:org.apache.camel.camel-core:2.16.4]
2017-10-17 08:46:57,641 | WARN | ://FileUploadBin | GenericFileOnCompletion | 43 - org.apache.camel.camel-core - 2.16.4 | Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@294a3d48 for file: GenericFile[Die.txt]
The answer is yes.
The ActiveMQ component allows messages to be sent to a JMS Queue or Topic or messages to be consumed from a JMS Queue or Topic using Apache ActiveMQ.
Apache Camel - Message Queues.
This is not really supported on the classic ActiveMQ broker.
However the next generation ActiveMQ Artemis supports large messages, and we have just added support for this in camel-jms as well. I wrote a blog entry about this: http://www.davsclaus.com/2017/10/working-with-large-messages-using.html
And we also added support for the javax.jms.StreamMessage
type in camel-jms. However this API is not as ideal for large messages, so it has limited usage. But nevertheless you can turn it on, on the component with the new option streamMessageTypeEnabled
in Camel 2.21 onwards, and then if the message body is streaming type then Camel will send as StreamMessage
instead of BytesMessage
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With