I have configured Spring DefaultMessageListenerContainer as ActiveMQ consumer consuming messages from a queue. Let's call it "Test.Queue" I have this code deployed in 4 different machines and all the machines are configured to the same ActiveMQ instance to process the messages from the same "Test.Queue" queue.
I set the max consumer size to 20 as soon as all the 4 machines are up and running, I see the number of consumers count against the queue as 80 (4 * max consumer size = 80)
Everything is fine when the messages produced and sent to the queue grows high.
When there are 1000's of messages and among the 80 consumers, let's say one of them is stuck it puts a freeze on Active MQ to stop sending messages to other consumers.
All messages are stuck in ActiveMQ forever.
As I have 4 machines with up to 80 consumers , I have no clue as to see which consumer failed to acknowledge.
I go stop and restart all the 4 machines and when I stop the machine that has the bad consumer which got stuck, then messages starts flowing again.
I don't know how to configure DefaultMessageListenerContainer to abandon the bad consumer and signal ActiveMQ immediately to start sending messages.
I was able to create the scenario even without Spring as follows:
I created 2 consumers (Consumer A, B) and in one consumer B's onMessage() method, I put the thread to sleep for a long time ( Thread.sleep(Long.MAX_VALUE)) having the condition like when current time % 13 is 0 then put the thread to sleep.
Ran these 2 consumers.
Let's say A, B, C are managed by Spring's DefaultMessageListenerContainer, how do I tweak Spring DefaultMessageListenerContainer to take that bad consumer off the pool (in my case consumer B) after it failed to acknowledge for X number of seconds, acknowledge the broker immediately so that the broker is not holding onto messages forever.
Thanks for your time.
Appreciate if I get a solution to this problem.
here are a few options to try...
set the queue prefetch to 0 to promote better distribution across consumers and reduce 'stuck' messages on specific consumers. see http://activemq.apache.org/what-is-the-prefetch-limit-for.html
set "?useKeepAlive=false&wireFormat.maxInactivityDuration=20000" on the connection to timeout the slow consumer after a specified inactive time
set the queue policy "slowConsumerStrategy->abortSlowConsumer"...again to timeout a slow consumer
<policyEntry ...
...
<slowConsumerStrategy>
<abortSlowConsumerStrategy />
</slowConsumerStrategy>
...
</policyEntry>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With