I'm experimenting with Spring, Camel and ActiveMQ to simulate simple messaging pattern with two distinct JMS brokers. Here is the route I'm trying to use.
@Component("testRouteBuilder")
public class CamelRouteBuilder extends RouteBuilder {
@Autowired
@Qualifier("jmscf2")
private ConnectionFactory jmsServer2;
@Override
public void configure() throws Exception {
from("timer://foo?delay=2000")
.setBody(simple("hello"))
.log("request: ${body}")
.to("bean://jmsbean")
.log("reply: ${body}");
from("jms1://queue:dest")
.log("got message: ${body}")
.log("headers: ${headers}")
.setBody(constant("reply"))
.process(new Processor() {
@Override
public void process(Exchange arg0) throws Exception {
Connection connection = jmsServer2.createConnection();
connection.start();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
TextMessage inMsg = arg0.getIn().getBody(TextMessage.class);
MessageProducer producer = session.createProducer(inMsg.getJMSReplyTo());
TextMessage outMsg = session.createTextMessage();
outMsg.setJMSCorrelationID(inMsg.getJMSCorrelationID());
outMsg.setText("reply");
producer.send(outMsg);
session.close();
connection.close();
}
});
}
@Component("jmsbean")
public static class JmsBean {
@Autowired
@Qualifier("jmscf1")
ConnectionFactory jmsServer1;
@Autowired
@Qualifier("jmscf2")
ConnectionFactory jmsServer2;
public String testJms(@Body String body) throws JMSException {
Connection conn = jmsServer1.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Connection conn2 = jmsServer2.createConnection();
conn2.start();
Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = session2.createTemporaryQueue();
TextMessage message = session.createTextMessage();
message.setJMSCorrelationID("tuomas");
message.setJMSReplyTo(tempQueue);
message.setJMSMessageID("tuomas");
Queue dest = session.createQueue("dest");
MessageProducer producer = session.createProducer(dest);
producer.send(message);
session.close();
conn.close();
MessageConsumer consumer = session2.createConsumer(tempQueue, "tuomas");
Message reply = consumer.receive();
session2.close();
conn2.close();
return reply.getBody(String.class);
}
}
}
And here are the bean components
@Configuration
@Profile("local")
@ImportResource("classpath:META-INF/spring/app-ctx.xml")
public class BeanDeclarations {
@Autowired
private CamelContext ctx;
@Bean("jms1")
@Primary
@Qualifier("jms1Component")
public JmsComponent jms1Component() {
return JmsComponent.jmsComponentAutoAcknowledge(jms1());
}
@Bean("jms2")
@Qualifier("jms2Component")
public JmsComponent jms2Component() {
return JmsComponent.jmsComponentAutoAcknowledge(jms2());
}
@Bean("jmscf1")
@Primary
@Qualifier("jmscf1")
public ConnectionFactory jms1() {
return new ActiveMQConnectionFactory("vm://localhost:7777");
}
@Bean("jmscf2")
@Qualifier("jmscf2")
public ConnectionFactory jms2() {
return new ActiveMQConnectionFactory("vm://localhost:7778");
}
@Bean
public Object propertiesComponent() {
PropertiesComponent component = ctx.getComponent("properties", PropertiesComponent.class);
component.setLocation("classpath://application.properties");
return new Object();
}
}
Trying to run the following example produces the following error
2018-04-07 15:34:55.554 INFO 7080 --- [ main] o.a.camel.spring.SpringCamelContext : Apache Camel 2.21.0 (CamelContext: camel-1) is starting
2018-04-07 15:34:55.554 INFO 7080 --- [ main] o.a.c.m.ManagedManagementStrategy : JMX is enabled
2018-04-07 15:34:55.704 INFO 7080 --- [ main] o.a.c.i.converter.DefaultTypeConverter : Type converters loaded (core: 193, classpath: 11)
2018-04-07 15:34:55.867 INFO 7080 --- [ main] o.a.camel.spring.SpringCamelContext : StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2018-04-07 15:34:55.929 INFO 7080 --- [ main] o.a.camel.spring.SpringCamelContext : Route: route1 started and consuming from: timer://foo?delay=2000
2018-04-07 15:34:56.054 INFO 7080 --- [ main] o.apache.activemq.broker.BrokerService : Using Persistence Adapter: KahaDBPersistenceAdapter[C:\Users\Tuomas\Desktop\springcamel\activemq-data\localhost\KahaDB]
2018-04-07 15:34:56.085 INFO 7080 --- [ JMX connector] o.a.a.broker.jmx.ManagementContext : JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
2018-04-07 15:34:56.132 INFO 7080 --- [ main] o.a.a.store.kahadb.MessageDatabase : KahaDB is version 6
2018-04-07 15:34:56.179 INFO 7080 --- [ main] o.a.a.store.kahadb.plist.PListStoreImpl : PListStore:[C:\Users\Tuomas\Desktop\springcamel\activemq-data\localhost\tmp_storage] started
2018-04-07 15:34:56.320 INFO 7080 --- [ main] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.15.3 (localhost, ID:DESKTOP-LI5P50P-55150-1523104496195-0:1) is starting
2018-04-07 15:34:56.398 INFO 7080 --- [ main] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.15.3 (localhost, ID:DESKTOP-LI5P50P-55150-1523104496195-0:1) started
2018-04-07 15:34:56.398 INFO 7080 --- [ main] o.apache.activemq.broker.BrokerService : For help or more information please see: http://activemq.apache.org
2018-04-07 15:34:56.398 INFO 7080 --- [ main] o.a.activemq.broker.TransportConnector : Connector vm://localhost started
2018-04-07 15:34:56.429 INFO 7080 --- [ main] o.a.camel.spring.SpringCamelContext : Route: route2 started and consuming from: jms1://queue:dest
2018-04-07 15:34:56.429 INFO 7080 --- [ main] o.a.camel.spring.SpringCamelContext : Total 2 routes, of which 2 are started
2018-04-07 15:34:56.429 INFO 7080 --- [ main] o.a.camel.spring.SpringCamelContext : Apache Camel 2.21.0 (CamelContext: camel-1) started in 0.875 seconds
2018-04-07 15:34:56.460 INFO 7080 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2018-04-07 15:34:56.460 INFO 7080 --- [ main] t.springcamel.SpringcamelApplication : Started SpringcamelApplication in 6.249 seconds (JVM running for 11.311)
2018-04-07 15:34:58.431 INFO 7080 --- [2 - timer://foo] route1 : request: hello
2018-04-07 15:34:58.476 INFO 7080 --- [sConsumer[dest]] route2 : got message:
2018-04-07 15:34:58.476 INFO 7080 --- [sConsumer[dest]] route2 : headers: {breadcrumbId=ID-DESKTOP-LI5P50P-1523104493101-0-2, JMSCorrelationID=tuomas, JMSCorrelationIDAsBytes=tuomas, JMSDeliveryMode=2, JMSDestination=queue://dest, JMSExpiration=0, JMSMessageID=ID:DESKTOP-LI5P50P-55150-1523104496195-4:2:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=temp-queue://ID:DESKTOP-LI5P50P-55150-1523104496195-6:1:1, JMSTimestamp=1523104498460, JMSType=null, JMSXGroupID=null, JMSXUserID=null}
2018-04-07 15:34:58.491 ERROR 7080 --- [sConsumer[dest]] o.a.camel.processor.DefaultErrorHandler : Failed delivery for (MessageId: ID:DESKTOP-LI5P50P-55150-1523104496195-4:2:1:1:1 on ExchangeId: ID-DESKTOP-LI5P50P-1523104493101-0-2). Exhausted after delivery attempt: 1 caught: org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[ID-DESKTOP-LI5P50P-1523104493101-0-2]
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route2 ] [route2 ] [jms1://queue:dest ] [ 15]
[route2 ] [log3 ] [log ] [ 0]
[route2 ] [log4 ] [log ] [ 0]
[route2 ] [setBody2 ] [setBody[constant{reply}] ] [ 0]
[route2 ] [process1 ] [Processor@0x6da9dde8 ] [ 15]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[ID-DESKTOP-LI5P50P-1523104493101-0-2]
at org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1846) ~[camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.impl.DefaultExchange.setException(DefaultExchange.java:385) ~[camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:66) ~[camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) ~[camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) [camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) [camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) [camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:113) [camel-jms-2.21.0.jar:2.21.0]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:744) [spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:704) [spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) [spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) [spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) [spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1173) [spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1165) [spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1062) [spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: java.lang.AbstractMethodError: org.apache.activemq.ActiveMQConnection.createSession(I)Ljavax/jms/Session;
at tuomas.springcamel.configuration.camel.CamelRouteBuilder$1.process(CamelRouteBuilder.java:47) ~[classes/:na]
at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63) ~[camel-core-2.21.0.jar:2.21.0]
... 18 common frames omitted
2018-04-07 15:34:58.491 WARN 7080 --- [sConsumer[dest]] o.a.c.c.jms.EndpointMessageListener : Execution of JMS message listener failed. Caused by: [org.apache.camel.CamelExecutionException - Exception occurred during execution on the exchange: Exchange[ID-DESKTOP-LI5P50P-1523104493101-0-2]]
org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[ID-DESKTOP-LI5P50P-1523104493101-0-2]
at org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1846) ~[camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.impl.DefaultExchange.setException(DefaultExchange.java:385) ~[camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:66) ~[camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) ~[camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) ~[camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) ~[camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) ~[camel-core-2.21.0.jar:2.21.0]
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:113) ~[camel-jms-2.21.0.jar:2.21.0]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:744) ~[spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:704) ~[spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) [spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) [spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1173) [spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1165) [spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1062) [spring-jms-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: java.lang.AbstractMethodError: org.apache.activemq.ActiveMQConnection.createSession(I)Ljavax/jms/Session;
at tuomas.springcamel.configuration.camel.CamelRouteBuilder$1.process(CamelRouteBuilder.java:47) ~[classes/:na]
at org.apache.camel.processor.DelegateSyncProcessor.process(DelegateSyncProcessor.java:63) ~[camel-core-2.21.0.jar:2.21.0]
... 18 common frames omitted
Why I can't create the ActiveMQ Session in the Camel processor, but in the bean I can? Does this have something to do with how Spring proxies the ConnectionFactory, bug in the camel or misconfiguration of the ActiveMQ broker?
The solution was to fix the following line
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
to
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Seems like ActiveMQ is incompatible with the JMS API in use
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With