Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Where is my expired JMS message?

I am using ActiveMQ 5.8.0 and Camel 2.10.4. I am reading ExchangePattern.InOnly messages from a JMS queue, and want to expire those which are not processed within a given time explicitly to a named dead letter queue.

I have the following route:

public class FulfillmentRequestRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        errorHandler(deadLetterChannel("jms:queue:dead").useOriginalMessage());
        from("jms:queue:fulfillmentRequest?explicitQosEnabled=true&timeToLive=10000&transacted=true")
            .transacted()
            .to("mock:initialProcessor");
    }
}

And the following ActiveMQ config:

<!-- Configure the ActiveMQ JMS broker server to listen on TCP port 61610 -->
<broker:broker useJmx="true" persistent="true" brokerName="myBroker">
    <broker:transportConnectors>
        <!-- expose a VM transport for in-JVM transport between AMQ and Camel on the server side -->
        <broker:transportConnector name="vm" uri="vm://myBroker" />
        <!-- expose a TCP transport for clients to use -->
        <broker:transportConnector name="tcp" uri="tcp://localhost:${tcp.port}" />
    </broker:transportConnectors>
    <broker:persistenceAdapter>
        <broker:kahaPersistenceAdapter directory="target/olp-activemq-data" maxDataFileLength="33554432"/>
    </broker:persistenceAdapter>
    <broker:destinationPolicy>
        <broker:policyMap>
          <broker:policyEntries>
            <!-- Set the following policy on all queues using the '>' wildcard -->
            <broker:policyEntry queue=">">
              <broker:deadLetterStrategy>
                <broker:sharedDeadLetterStrategy processExpired="true"
                                                 processNonPersistent="true" />
              </broker:deadLetterStrategy>
            </broker:policyEntry>
          </broker:policyEntries>
        </broker:policyMap>
    </broker:destinationPolicy>
</broker:broker>

<!-- Configure Camel ActiveMQ to use the embedded ActiveMQ broker declared above -->
<!-- Using the ActiveMQComponent gives us connection pooling for free -->
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="vm://myBroker" />
    <property name="transacted" value="true"/>
    <property name="transactionManager" ref="jmsTransactionManager"/>
    <property name="acceptMessagesWhileStopping" value="false"/>
</bean>
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="vm://myBroker" />
</bean>

Finally I have a Unit Test which creates two messages,one which will be processed, and the other which should time out.

@RunWith(CamelSpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:/META-INF/spring/camel-server.xml"})
public class FulfillmentRequestTimeoutTest {

    @EndpointInject(uri = "mock:initialProcessor")
    protected MockEndpoint mockEndpoint;

    @Produce
    protected ProducerTemplate template;

    protected ConsumerTemplate consumer;

    @Autowired
    @Qualifier("camel-server")
    protected CamelContext context;

    @DirtiesContext
    @Test
    public void requestPutOnTimedOutQueueIfOlderThanTimeToLive() throws Exception {

        // Given
        consumer = context.createConsumerTemplate();

        int expectedValidMessageCount = 2;
        mockEndpoint.expectedMessageCount(expectedValidMessageCount);        

        // When 
        String xmlBody1 = "<?xml version=\"1.0\"?><body>THIS WILL NOT TIMEOUT</body>";
        template.sendBody("jms:queue:fulfillmentRequest", ExchangePattern.InOnly, xmlBody1);

        long ttl = System.currentTimeMillis() - 12000000;
        String xmlBody2 = "<?xml version=\"1.0\"?><body>!!!!!TIMED OUT!!!!!</body>";
        template.sendBodyAndHeader("jms:queue:fulfillmentRequest", ExchangePattern.InOnly, xmlBody2, "JMSExpiration", ttl);

        // Then
        // The second message is not processed
        mockEndpoint.assertIsSatisfied();                         // This should not pass with "2" set above

        List<Exchange> list = mockEndpoint.getReceivedExchanges();
        String notTimedOutMessageBody = (String) list.get(0).getIn().getBody(String.class);

        assertEquals(xmlBody1, notTimedOutMessageBody);

        Thread.sleep(5000);

        // And is instead routed to the timedOut JMS queue
        Object dlqBody  = consumer.receiveBodyNoWait("jms:queue:dead");
        assertNotNull("Should not lose the message", dlqBody);          // This fails
        assertEquals(xmlBody2, dlqBody);
    }

    @Configuration
    public static class ContextConfig extends SingleRouteCamelConfiguration {

        @Bean
        public RouteBuilder route() {
            return new FulfillmentRequestRoute();
        }
    }
}

The second message isn't expiring at all, despite taking into account @Petter's tip (thanks) below.

I have this Unit-test-pattern working elsewhere in tests which explicitly throw exceptions from with transactions in Camel, but I'd prefer not to have to manually start looking into headers myself when this all seems to be handled already.

like image 495
Andrew Harmel-Law Avatar asked May 17 '13 15:05

Andrew Harmel-Law


1 Answers

Messages that have expired once on the queue or when the hit the queue in ActiveMQ will take either of the following actions:

  1. If the message is persistent then it will be placed on the queue ActiveMQ.DLQ.
  2. If the message is non persistent, it will be discarded right away without further notice.

In your ActiveMQ config, you have disabled persistence, so a quick guess is that your messages will be deleted on sight without any notice to your application what so ever.

Read more here

like image 135
Petter Nordlander Avatar answered Oct 17 '22 05:10

Petter Nordlander