I am using ActiveMQ 5.8.0 and Camel 2.10.4. I am reading ExchangePattern.InOnly messages from a JMS queue, and want to expire those which are not processed within a given time explicitly to a named dead letter queue.
I have the following route:
public class FulfillmentRequestRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
errorHandler(deadLetterChannel("jms:queue:dead").useOriginalMessage());
from("jms:queue:fulfillmentRequest?explicitQosEnabled=true&timeToLive=10000&transacted=true")
.transacted()
.to("mock:initialProcessor");
}
}
And the following ActiveMQ config:
<!-- Configure the ActiveMQ JMS broker server to listen on TCP port 61610 -->
<broker:broker useJmx="true" persistent="true" brokerName="myBroker">
<broker:transportConnectors>
<!-- expose a VM transport for in-JVM transport between AMQ and Camel on the server side -->
<broker:transportConnector name="vm" uri="vm://myBroker" />
<!-- expose a TCP transport for clients to use -->
<broker:transportConnector name="tcp" uri="tcp://localhost:${tcp.port}" />
</broker:transportConnectors>
<broker:persistenceAdapter>
<broker:kahaPersistenceAdapter directory="target/olp-activemq-data" maxDataFileLength="33554432"/>
</broker:persistenceAdapter>
<broker:destinationPolicy>
<broker:policyMap>
<broker:policyEntries>
<!-- Set the following policy on all queues using the '>' wildcard -->
<broker:policyEntry queue=">">
<broker:deadLetterStrategy>
<broker:sharedDeadLetterStrategy processExpired="true"
processNonPersistent="true" />
</broker:deadLetterStrategy>
</broker:policyEntry>
</broker:policyEntries>
</broker:policyMap>
</broker:destinationPolicy>
</broker:broker>
<!-- Configure Camel ActiveMQ to use the embedded ActiveMQ broker declared above -->
<!-- Using the ActiveMQComponent gives us connection pooling for free -->
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="vm://myBroker" />
<property name="transacted" value="true"/>
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="acceptMessagesWhileStopping" value="false"/>
</bean>
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://myBroker" />
</bean>
Finally I have a Unit Test which creates two messages,one which will be processed, and the other which should time out.
@RunWith(CamelSpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:/META-INF/spring/camel-server.xml"})
public class FulfillmentRequestTimeoutTest {
@EndpointInject(uri = "mock:initialProcessor")
protected MockEndpoint mockEndpoint;
@Produce
protected ProducerTemplate template;
protected ConsumerTemplate consumer;
@Autowired
@Qualifier("camel-server")
protected CamelContext context;
@DirtiesContext
@Test
public void requestPutOnTimedOutQueueIfOlderThanTimeToLive() throws Exception {
// Given
consumer = context.createConsumerTemplate();
int expectedValidMessageCount = 2;
mockEndpoint.expectedMessageCount(expectedValidMessageCount);
// When
String xmlBody1 = "<?xml version=\"1.0\"?><body>THIS WILL NOT TIMEOUT</body>";
template.sendBody("jms:queue:fulfillmentRequest", ExchangePattern.InOnly, xmlBody1);
long ttl = System.currentTimeMillis() - 12000000;
String xmlBody2 = "<?xml version=\"1.0\"?><body>!!!!!TIMED OUT!!!!!</body>";
template.sendBodyAndHeader("jms:queue:fulfillmentRequest", ExchangePattern.InOnly, xmlBody2, "JMSExpiration", ttl);
// Then
// The second message is not processed
mockEndpoint.assertIsSatisfied(); // This should not pass with "2" set above
List<Exchange> list = mockEndpoint.getReceivedExchanges();
String notTimedOutMessageBody = (String) list.get(0).getIn().getBody(String.class);
assertEquals(xmlBody1, notTimedOutMessageBody);
Thread.sleep(5000);
// And is instead routed to the timedOut JMS queue
Object dlqBody = consumer.receiveBodyNoWait("jms:queue:dead");
assertNotNull("Should not lose the message", dlqBody); // This fails
assertEquals(xmlBody2, dlqBody);
}
@Configuration
public static class ContextConfig extends SingleRouteCamelConfiguration {
@Bean
public RouteBuilder route() {
return new FulfillmentRequestRoute();
}
}
}
The second message isn't expiring at all, despite taking into account @Petter's tip (thanks) below.
I have this Unit-test-pattern working elsewhere in tests which explicitly throw exceptions from with transactions in Camel, but I'd prefer not to have to manually start looking into headers myself when this all seems to be handled already.
Messages that have expired once on the queue or when the hit the queue in ActiveMQ will take either of the following actions:
ActiveMQ.DLQ
. In your ActiveMQ config, you have disabled persistence, so a quick guess is that your messages will be deleted on sight without any notice to your application what so ever.
Read more here
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With