Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I make Spring JMSListener burst to max concurrent threads?

I have a Spring JMS Application that is using ActiveMQ version 5.10. I am performing a simple test to concurrency. I am using Spring Boot, current version and annotations to configure JMSListener and message producers.

The message producer just throws messsages on a queue as fast as it can. The message listener is pulling messages off the queue, but sleeping for 1 second after getting the message -- simulating some work that the message listener would need to do after getting a message.

I have the JMSListener set to 100-1000 concurrent threads. If I start the message producer and consumer at the same time (both run in their own JVM) the consumer never gets above the minimum configured threads, even though the max range is set 1000.

If I let the producer start first and place a few thousand messages on the queue, then start 1 or more instances of the consumer, it will raise the threads steadily, starting at 100 then 20 or so threads each second until it gets to a state where there is about 20-30 messages in the queue that are in-flight. It never catches the producer -- there is always some messages in queue even though the consumer is no where near its maxConcurrency count.

Why doesn't the message consumer burst into a bunch of additional threads to empty the queue instead of letting the queue have the 20-30 messages in it? Isn't there a way for the consumer continue to add threads faster in order to catch up with the messages in queue?

Here are the relevant parts of the code.

Message Producer

@Component
public class ClientServiceImpl implements ClientService {

    private static final String QUEUE="message.test.queue";

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    public void submitMessage(ImportantMessage importantMessage) {

        System.out.println("*** Sending " + importantMessage);
        jmsTemplate.convertAndSend(QUEUE, importantMessage);

    }
}

Message Consumer

@SpringBootApplication
@EnableJms
public class AmqConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(AmqConsumerApplication.class, args);
    }
    @Value("${JMSHost}")
    private String JMS_BROKER_URL;

    @Autowired
    static Command command;

    @Bean
    public ConnectionFactory connectionFactory() {
        ConnectionFactory factory= new ActiveMQConnectionFactory(JMS_BROKER_URL);
        ((ActiveMQConnectionFactory)factory).setTrustAllPackages(true);
        ((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true);
        ((ActiveMQConnectionFactory)factory).setAlwaysSessionAsync(false);
        return factory;
    }

}

With the listener configured as such...

@Component
public class TransformationListener {

    private static final String QUEUE="message.test.queue?consumer.prefetchSize=10";

    @JmsListener(destination=QUEUE, concurrency = "100-1000")
    public void handleRequest(ImportantMessage importantMessage) {
        System.out.println("*** Recieved message: " + importantMessage + " on thread" + Thread.currentThread().getId());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
like image 603
Griff Avatar asked Feb 18 '16 21:02

Griff


People also ask

What is concurrency in JMS listener?

The Concurrency property specifies whether the message consumers use connection consumer or serialized processing. To use concurrent processing for a connection, select the Connection consumer setting. To use serial execution, select the Serial mode setting.

What is DefaultJmsListenerContainerFactory?

Class DefaultJmsListenerContainerFactoryA JmsListenerContainerFactory implementation to build a regular DefaultMessageListenerContainer . This should be the default for most users and a good transition paths for those that are used to build such container definition manually.

How does JMS listener work?

The JMS Listener adapter operates in an asynchronous mode. It establishes an asynchronous listener on the queue or topic destination specified by the JNDI name of Destination field. When a qualified message arrives at the destination, the adapter immediately processes the message.

How does Spring JMS listener work?

What is a Spring JMS Listener? In order to asynchronously receive JMS messages, Spring offers a solution to create message-driven POJOs (MDP). A message listener container is used to receive messages from a JMS broker. The container is a wrapper of sorts that calls a simple POJO listener class when a message arrives.


1 Answers

Are you still facing this behavior ? Did you read this advice "Pooled Consumers and prefetch" on http://activemq.apache.org/what-is-the-prefetch-limit-for.html Did you tried with prefetchSize=0 or 1 ? I think 1 can resolve your problem. If prefetchSize is > 1 maybe you need to decrease the AbortSlowAckConsumerStrategy to lower than default 30s. To have more than 100 threads consuming messages in your case you need more than 1000 messages not consumed and not prefetched in the queue because the prefetchSize is to 10.

like image 189
Hassen Bennour Avatar answered Oct 04 '22 04:10

Hassen Bennour