After a week of coding and searching forums, it seems timely to ask...
I have a C# application which processes messages sent by RabbitMQ using EventingBasicConsumer. I want to process several messages concurrently, so I have instantiated a few channels (8 in this case) on the same connection, each with a single consumer. I have then attached an event-handler to each consumer's Received event. Based on all my readings so far, this setup should allow the event-handler to be triggered concurrently by the consumers, each running in its own thread. But in my case consumers receive messages sequentially only after the a previous consumer acknowledges its message.
Has anyone else experienced this behavior? Is my understanding correct that the processing should technically be concurrent in this case?
Below is a basic code to better illustrate the issue:
Initialise() {
ConsumerChannels_ = new IModel[ConsumerCount_];
Consumers_ = new EventingBasicConsumer[ConsumerCount_];
for (int i = 0; i < ConsumerCount_; ++i)
{
ConsumerChannels_[i] = Connection_.CreateModel();
Consumers_[i] = new EventingBasicConsumer(ConsumerChannels_[i]);
Consumers_[i].Received += MessageReceived;
}
}
MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e)
{
int id = GetConsumerIndex(sender);
Log_.Debug("Consumer " + id + ": processing started...");
// do some time consuming processing here
sender.Model.BasicAck(e.DeliveryTag, false);
Log_.Debug("Consumer " + id + ": processing ended.");
}
What I expect to see is something like: // concurrent processing
Consumer 1: processing started...
Consumer 2: processing started...
Consumer 3: processing started...
...
Consumer 6: processing ended.
Consumer 7: processing ended.
Consumer 8: processing ended.
But what I get instead is: // sequential processing
Consumer 1: processing started...
Consumer 1: processing ended.
Consumer 2: processing started...
Consumer 2: processing ended.
...
Consumer 8: processing started...
Consumer 8: processing ended.
Any ideas on how to proceed would be most appreciated.
Use multiple queues and consumers Queues are single-threaded in RabbitMQ, and one queue can handle up to about 50 thousand messages.
The RabbitMQ message broker was deployed atop Google Compute Engine where it demonstrated the ability to receive and deliver more than one million messages per second (a sustained combined ingress/egress of over two million messages per second).
RabbitMQ has a plugin for consistent hash exchange. Using that exchange, and one consumer per queue, we can achieve message order with multiple consumers. The hash exchange distributes routing keys among queues, instead of messages among queues. This means all messages with the same routing key will go the same queue.
Single active consumer allows to have only one consumer at a time consuming from a queue and to fail over to another registered consumer in case the active one is cancelled or dies. Consuming with only one consumer is useful when messages must be consumed and processed in the same order they arrive in the queue.
You can actually set the number of parallel processing tasks when creating your ConnectionFactory
!
ConnectionFactory factory = new ConnectionFactory
{
ConsumerDispatchConcurrency = 2,
};
The default value is 1, which is serial/sequential processing.
I found this out by dissecting the .NET client's source code. Here's the interesting part (concurrency
is set from ConsumerDispatchConcurrency
):
Func<Task> loopStart = ProcessChannelAsync;
if (concurrency == 1)
{
_worker = Task.Run(loopStart);
}
else
{
var tasks = new Task[concurrency];
for (int i = 0; i < concurrency; i++)
{
tasks[i] = Task.Run(loopStart);
}
_worker = Task.WhenAll(tasks);
}
But beware, this can result in race conditions! The property has this remark:
For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. In addition to that consumers need to be thread/concurrency safe.
you have to ways to do that:
Increase the concurrency by adding your own Thread-pool inside:
MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e) {
int id = GetConsumerIndex(sender);
Log_.Debug("Consumer " + id + ": processing started...");
// do some time consuming processing here
// PUT your thread-pool here and process the messages inside the thread
sender.Model.BasicAck(e.DeliveryTag, false);
Log_.Debug("Consumer " + id + ": processing ended."); }
}
Note: the BasicAck
can be called in different threads.
or
you can add more consumers to the queue, by using the QoS=1
you can consume the messages in round-robin
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With