Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrency in RabbitMQ

After a week of coding and searching forums, it seems timely to ask...

I have a C# application which processes messages sent by RabbitMQ using EventingBasicConsumer. I want to process several messages concurrently, so I have instantiated a few channels (8 in this case) on the same connection, each with a single consumer. I have then attached an event-handler to each consumer's Received event. Based on all my readings so far, this setup should allow the event-handler to be triggered concurrently by the consumers, each running in its own thread. But in my case consumers receive messages sequentially only after the a previous consumer acknowledges its message.

Has anyone else experienced this behavior? Is my understanding correct that the processing should technically be concurrent in this case?

Below is a basic code to better illustrate the issue:

Initialise() {
    ConsumerChannels_ = new IModel[ConsumerCount_];
    Consumers_ = new EventingBasicConsumer[ConsumerCount_];
    for (int i = 0; i < ConsumerCount_; ++i)
    {
         ConsumerChannels_[i] = Connection_.CreateModel();
         Consumers_[i] = new EventingBasicConsumer(ConsumerChannels_[i]);
         Consumers_[i].Received += MessageReceived;
    }
}

MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e)
{
    int id = GetConsumerIndex(sender);
    Log_.Debug("Consumer " + id + ": processing started...");         
    // do some time consuming processing here
    sender.Model.BasicAck(e.DeliveryTag, false);
    Log_.Debug("Consumer " + id + ": processing ended.");
}

What I expect to see is something like: // concurrent processing

Consumer 1: processing started...

Consumer 2: processing started...

Consumer 3: processing started...

...

Consumer 6: processing ended.

Consumer 7: processing ended.

Consumer 8: processing ended.

But what I get instead is: // sequential processing

Consumer 1: processing started...

Consumer 1: processing ended.

Consumer 2: processing started...

Consumer 2: processing ended.

...

Consumer 8: processing started...

Consumer 8: processing ended.

Any ideas on how to proceed would be most appreciated.

like image 705
Kia Avatar asked Oct 31 '16 08:10

Kia


People also ask

Is RabbitMQ multithreaded?

Use multiple queues and consumers Queues are single-threaded in RabbitMQ, and one queue can handle up to about 50 thousand messages.

How many messages can RabbitMQ handle per second?

The RabbitMQ message broker was deployed atop Google Compute Engine where it demonstrated the ability to receive and deliver more than one million messages per second (a sustained combined ingress/egress of over two million messages per second).

Can RabbitMQ have multiple consumers?

RabbitMQ has a plugin for consistent hash exchange. Using that exchange, and one consumer per queue, we can achieve message order with multiple consumers. The hash exchange distributes routing keys among queues, instead of messages among queues. This means all messages with the same routing key will go the same queue.

How many consumers can RabbitMQ have?

Single active consumer allows to have only one consumer at a time consuming from a queue and to fail over to another registered consumer in case the active one is cancelled or dies. Consuming with only one consumer is useful when messages must be consumed and processed in the same order they arrive in the queue.


2 Answers

You can actually set the number of parallel processing tasks when creating your ConnectionFactory!

ConnectionFactory factory = new ConnectionFactory
{
    ConsumerDispatchConcurrency = 2,
};

The default value is 1, which is serial/sequential processing.

I found this out by dissecting the .NET client's source code. Here's the interesting part (concurrency is set from ConsumerDispatchConcurrency):

Func<Task> loopStart = ProcessChannelAsync;
if (concurrency == 1)
{
    _worker = Task.Run(loopStart);
}
else
{
    var tasks = new Task[concurrency];
    for (int i = 0; i < concurrency; i++)
    {
        tasks[i] = Task.Run(loopStart);
    }
    _worker = Task.WhenAll(tasks);
}

But beware, this can result in race conditions! The property has this remark:

For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. In addition to that consumers need to be thread/concurrency safe.

like image 145
Christian Davén Avatar answered Sep 18 '22 07:09

Christian Davén


you have to ways to do that:

Increase the concurrency by adding your own Thread-pool inside:

MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e) {
    int id = GetConsumerIndex(sender);
    Log_.Debug("Consumer " + id + ": processing started...");         
    // do some time consuming processing here
    // PUT your thread-pool here and process the messages inside the thread

    sender.Model.BasicAck(e.DeliveryTag, false);
    Log_.Debug("Consumer " + id + ": processing ended."); }

}

Note: the BasicAck can be called in different threads.

or

you can add more consumers to the queue, by using the QoS=1 you can consume the messages in round-robin

like image 35
Gabriele Santomaggio Avatar answered Sep 22 '22 07:09

Gabriele Santomaggio