I have a DefaultMessageListenerContainer, which is (in my opinion) not scaling up. The Container is defined to listen on a queue, where 100 messages are located in.
I would expect, that the Container is going to any lengths, that the messages would be consumed as fast as it is possible (by observing the maxConcurrentConsumers configuration). So i would assume, that there are 7 concurrentConsumers. (beginning by 2 concurrentConsumers at container-startup) Some logging-information:
activeConsumerCount: 5
concurrentConsumers: 2
scheduledConsumerCount: 5
idleConsumerLimit: 1
idleTaskExecLimit: 1
maxConcurrentConsumers: 7
My Spring-config (a part of it):
<bean id="abstractMessageListenerContainer" class="my.package.structure.LoggingListenerContainer" abstract="true">
<property name="connectionFactory" ref="jmscfCee" />
<property name="maxConcurrentConsumers" value="7"/>
<property name="receiveTimeout" value="100000" />
<property name="concurrentConsumers" value="2" />
</bean>
<bean class="my.package.structure.LoggingListenerContainer" parent="abstractMessageListenerContainer">
<property name="destinationName" value="MY.QUEUE" />
<property name="messageListener" ref="myMessageListener" />
</bean>
<bean id="myMessageListener" class="my.package.structure.ListenerClass"></bean>
My Logging Container
public class LoggingListenerContainer extends DefaultMessageListenerContainer{
private static final Logger logger = Logger
.getLogger(LoggingListenerContainer.class);
@Override
protected void doInvokeListener(MessageListener listener, Message message)
throws JMSException {
logger.info("activeConsumerCount: " + this.getActiveConsumerCount());
logger.info("concurrentConsumers: " + this.getConcurrentConsumers());
logger.info("scheduledConsumerCount: " + this.getScheduledConsumerCount());
logger.info("idleConsumerLimit: " + this.getIdleConsumerLimit());
logger.info("idleTaskExecLimit: " + this.getIdleTaskExecutionLimit());
logger.info("maxConcurrentConsumers: " + this.getMaxConcurrentConsumers());
super.doInvokeListener(listener, message);
}
My Listener Class:
public class ListenerClass implements MessageListener {
public void onMessage(Message msg) {
//Do some business function
}
}
Could someone be so kind to correct my configuration or give me some tipps concerning my configuration or explain me the approach of the Container? (if i had misunderstood something)
I'm locally testing with ActiveMQ (in Production with WebSphere MQ) - if it's relevant for scalability topics.
EDIT:
<bean id="jmscfCee" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>${jmscfCee.hostName}</value>
</property>
</bean>
<bean id="jmscfCeeCachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory ">
<constructor-arg ref="jmscfCee" />
<property name="sessionCacheSize" value="10" />
</bean>
It depends. I had a similar issue with ActiveMQ a few years back, whereby its default behaviour is heavily opimized towards high volumes (many thousands) of small messages. By default each consumer will pre-fetch messages in batches of 1000, so if you have small numbers of messages you'll probably find they have all ended up in the pre-fetch buffer of one consumer, leaving the other consumers idle.
You can tune this behaviour using a prefetch policy, either on the connection URI or in the Spring configuration if that's how you're building your connection factory.
<amq:connectionFactory id="connectionFactory" brokerURL="vm://localhost">
<property name="prefetchPolicy">
<amq:prefetchPolicy all="1" />
</property>
</amq:connectionFactory>
The version of ActiveMQ I was using at the time didn't support a prefetch limit of 0 (i.e. don't prefetch, just go to the broker every time) but the documentation suggests that this is now allowed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With