Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Keep spring context alive until JMS messages are consumed

I have a pretty standard setup related to JMS - Spring Boot and ActiveMQ. It works fine, until i tried to do a simple integration test. After some investigation I found that both the Spring context and the embedded broker are closed after the first JMS message has been consumed, no matter that during the consumption, another event is fired. The broker issue i was able to solve by adding the useShutdownHook=false connection option in the test setup, i.e.

spring.activemq.broker-url = vm://broker?async=false&broker.persistent=false&broker.useShutdownHook=false

What i'm looking for is basically a way to force the test to "stay alive" until all JMS messages are consumed (in this case they are just two). I understand the async nature of the whole setup, but still during tests it would be helpful to get all the results of these messages being produced and consumed.

Below is my setup, although it's fairly simple.

@EnableJms
public class ActiveMqConfig {

    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setMessageConverter(messageConverter);
        return jmsTemplate;
    }

    @Bean
    public MessageConverter messageConverter() {
        MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
        messageConverter.setTargetType(MessageType.TEXT);
        messageConverter.setTypeIdPropertyName("_type");
        return messageConverter;
    }
}

I then have a message-driven POJO that listens for a given event:

@JmsListener(destination = "events")
public void applicationSubmitted(MyType event) {
    // do some work with the event here

    jmsTemplate.convertAndSend("commands", mymessage);
}

And another one:

@JmsListener(destination = "commands")
public void onCommand(TextMessage textMessage) {

}

One thing that I tried out and it worked is to add a delay, i.e. sleep(200) after the message is sent. However, that's very unreliable and also slows down tests, as the execution perhaps takes less that 50ms. Below's the test itself. Unless the waiting is uncommented, i never get to the second event listener, as the application context closes, the tests ends and the message is "forgotten".

@SpringBootTest
class MyEventIntegrationTest extends Specification {

    @Autowired
    JmsTemplate jmsTemplate

    def "My event is successfully handled"() {

        given:
        def event = new MyEvent()

        when:
        jmsTemplate.convertAndSend("events", event)
        // sleep(200)

        then:
        1 == 1
    }
}
like image 674
Milan Milanov Avatar asked Mar 14 '18 16:03

Milan Milanov


2 Answers

Well, this is a standard problem when testing systems based on async message exchange. Usually, it's solved in the part of the test that you skipped - the then part.

The thing is that in your tests you usually expect the system to do something useful, e.g. make changes in the DB, send a rest call to another system, send a message in another queue etc. We could wait some time until it happened by constantly checking the result - if the result is achieved within the time window that we have set - then we can assume the test has passed.

The pseudo code of this approach is the following:

for (i to MAX_RETRIES; i++) {
   checkThatTheChangesInDBHasBeenMade();
   checkThatTheRestCallHasBeenMade();
   checkThatTheMessageIsPostedInAnotherQueue();

   Thread.sleep(50ms);
}

This way in the best scenario your test will pass in 50ms. In the worse case, it will fail and this will take MAX_RETRIES * 50ms time to execute the test.

Also, I should mention that there is a nice tool called awaitility that provides nice API (btw it has support of groovy DSL) to handle such kind of problems in the async world:

await().atMost(5, SECONDS).until(customerStatusIsUpdated());
like image 151
Danylo Zatorsky Avatar answered Sep 23 '22 00:09

Danylo Zatorsky


I think the root of your problem is the asynchronous event handling. After you've send the event, your test is just over. This will - of course - cause the Spring context and the broker to shutdown. The JMS listeners are running in another thread. You must find a way to wait for them. Otherwise, your thread (which is your test case) is just done.

We faced a similar problem in our last project and wrote a small utility which helped us a lot. JMS offers the ability to "browse" a queue and to look if it's empty:

public final class JmsUtil {

    private static final int MAX_TRIES = 5000;
    private final JmsTemplate jmsTemplate;

    public JmsUtil(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    private int getMessageCount(String queueName) {
        return jmsTemplate.browseSelected(queueName, "true = true", (s, qb) -> Collections.list(qb.getEnumeration()).size());
    }

    public void waitForAll(String queueName) {
        int i = 0;
        while (i <= MAX_TRIES) {
            if (getMessageCount(queueName) == 0) {
                return;
            }
            i++;
        }
}

With this utility you could do something like this:

def "My event is successfully handled"() {

        given:
        def event = new MyEvent()

        when:
        jmsTemplate.convertAndSend("events", event)
        jmsUtility.waitForAll("events"); // wait until the event has been consumed
        jmsUtility.waitForAll("commands"); // wait until the command has been consumed

        then:
        1 == 1
    }

Note: This utility assumes that you send JMS messages to a queue. By browsing the queue we can check if it's empty. In case of a topic you might need to do another check. So be aware of that!

like image 32
Thomas Uhrig Avatar answered Sep 25 '22 00:09

Thomas Uhrig