for my current project i need to consume messages from many destinations (from hundreds up to 20 or 30k) all destinations are topics. currently (for initial load tests) all messages are created localy on the same server, in a thread pool.
my current spring config uses an embedded activemq in a network of brokers (for clustering) and DefaultMessageListenerContainers (DMLCs) with a common TaskExecutor. while the number of destinations is very high, the throughput of each destination is relatively low.
My only requirement is that all messages are consumed as soon as possible.
My Config:
<bean id="connectionfactory" class="org.springframework.jms.connection.CachingConnectionFactory" destroy-method="destroy">
<property name="targetConnectionFactory">
<ref bean="amqConnectionFactory" />
</property>
</bean>
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost:61616?async=false&jms.dispatchAsync=false" />
<property name="userName" value="admin" />
<property name="password" value="admin" />
</bean>
<bean id="listenerThreadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="70" />
<property name="maxPoolSize" value="70" />
<property name="daemon" value="true" />
<property name="keepAliveSeconds" value="60" />
</bean>
<!-- Message Listener Container Template for Topics -->
<bean id="topiccontainertemplate" class="org.springframework.jms.listener.DefaultMessageListenerContainer" scope="prototype"
destroy-method="destroy">
<property name="autoStartup" value="false" />
<property name="connectionFactory" ref="connectionfactory" />
<property name="pubSubDomain" value="true" />
<property name="cacheLevelName" value="CACHE_CONSUMER" />
<property name="destinationName" value="default" />
<property name="maxMessagesPerTask" value="1" />
<property name="receiveTimeout" value="1" />
<property name="taskExecutor" ref="listenerThreadPoolTaskExecutor" />
</bean>
My code uses the application context as a DMLC-Factory and sets the final configuration of the container:
AbstractMessageListenerContainer container = context.getBean("simpletopiccontainertemplate", AbstractMessageListenerContainer.class);
container.setDestinationName(localEntity.getId().getDestination());
container.setMessageListener(mylistener);
container.start();
while we don't lose messages in this configuration the transition time for an individual message can be very long.
Q1: Is there a more efficient way to listen to a big number of destinations?
Q2: Are there possible improvements for my listener configuration?
Q3: Beside the DMLC i also tried the SimpleMessageListenerContainer but i couldn't get it to work. is there something wrong with my config?
<bean id="simpletopiccontainertemplate" class="org.springframework.jms.listener.SimpleMessageListenerContainer" scope="prototype"
destroy-method="destroy">
<property name="autoStartup" value="false" />
<property name="connectionFactory" ref="connectionfactory" />
<property name="destinationName" value="default" />
<property name="concurrency" value="1" />
<property name="pubSubDomain" value="true" />
<property name="taskExecutor" ref="listenerThreadPoolTaskExecutor" />
</bean>
Do I understand correctly that you are manually creating 20-30 thousand of SimpleMessageListenerContainer
s, each listening on a different topic? I'm surprised that it even works since every listener by default creates one thread and 20-30 thousands of threads running concurrently in one JVM is impressive (and scary).
I see two ways to improve your performance:
read messages manually, e.g. using JmsTemplate.receive()
convenience method. Just remember to use very low timeout. By iteratively checking each and every topic in fewer threads (10? 100?) you will experience some latency, but less threads means less time wasted for context switching.
Honestly I don't think this solution will scale and give you better performance, but try, examining different numbers of threads and timeouts.
Cluster! I am afraid that listening on 20-30k topics with one application will never work (fast enough). However seems like you can easily scale-out this task. Throw in ten or maybe hundred servers and let each listen for two thousand or two hundred topic accordingly. This will scale linearily.
I moved away from the DefaultMessageListenerContainer
to the SimpleMessageListenerContainer
. The simple container uses push messaging. This way (and with appropriate config the embedded broker pushes messages into the listener instead of polling the destination.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With