Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I throttle the amount of messages coming from ActiveMQ in my C# app?

I'm using ActiveMQ in a .Net program and I'm flooded with message-events.

In short when I get a queue-event 'onMessage(IMessage receivedMsg)' I put the message into an internal queue out of which X threads do their thing.

At first I had: 'AcknowledgementMode.AutoAcknowledge' when creating the session so I'm guessing that all the messages in the queue got sucked down and put into the memory queue (which is risky since with a crash, everything is lost).

So then I used: 'AcknowledgementMode.ClientAcknowledge' when creating the session, and when a worker was ready with the message it calls the 'commit()' method on the message. However, still all the messages get sucked down from the queue.

How can I configure it that ONLY an X amount of messages are being processed or are in an internal queue, and that not everything is being 'downloaded' right away?

like image 717
Toad Avatar asked Nov 25 '10 13:11

Toad


2 Answers

Are you on .NET 4.0? You could use a BlockingCollection . Set it to the maximum amount it may contain. As soon as a thread tries to put in an excess element, the Add operation will block until the collection falls below the threshold again.

Maybe that would do it for throttling?

There is also an API for throttling in the Rx framework, but I do not know how it is implemented. If you implement your Queue source as Observable, this API would become available for you, but I don't know if this hits your needs.

like image 198
flq Avatar answered Sep 28 '22 05:09

flq


You can set the client prefetch to control how many messages the client will be sent. When the Session is in Auto Ack, the client will only ack a message once its been delivered to your app via the onMessage callback or through a synchronous receive. By default the client will prefetch 1000 messages from the broker, if the client goes down these messages would be redelivered to another client it this was a Queue, otherwise for a topic they are just discarded as a topic is a broadcast based channel. If you set the prefetch to one then you client would only be sent one message from the sever, then each time your onMessage callback completes a new message would be dispatched as the client would ack that message, that is if the session is in Auto Ack mode.

Refer to the NMS configuration page for all the options: http://activemq.apache.org/nms/configuring.html

Regards

Tim. FuseSource.com

like image 27
Tim Bish Avatar answered Sep 28 '22 07:09

Tim Bish