Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rabbit Mq java client parallel consumption

I want to process messages from a rabbitMq queue in parallel. The queue is configured to be autoAck =false. I am using the camel-rabbitMQ support for camel endpoints, which has support for a threadPoolSize parameter, but this does not have the desired effect. Messages are still processed serially off the queue, even when threadpoolsize=20.

From debugging through the code I can see that the threadpoolsize parameter is used to create an ExecutorService that is used to pass to the rabbit connectionfactory as described here. This all looks good until you get into the rabbit ConsumerWorkService. Here messages are processed in block of max size 16 messages. Each message in a block is processed serially and then if there is more work to do the executor service is invokes with the next block. A code snippet of this is below. From this use of the executor service I can't see how the messages can be processed in parallel. The executorservice only ever has one piece of work to perform at a time.

What am I Missing?

private final class WorkPoolRunnable implements Runnable {

        public void run() {
            int size = MAX_RUNNABLE_BLOCK_SIZE;
            List<Runnable> block = new ArrayList<Runnable>(size);
            try {
                Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
                if (key == null) return; // nothing ready to run
                try {
                    for (Runnable runnable : block) {
                        runnable.run();
                    }
                } finally {
                    if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
                        ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
                    }
                }
            } catch (RuntimeException e) {
                Thread.currentThread().interrupt();
            }
        }
like image 918
mR_fr0g Avatar asked Oct 08 '13 13:10

mR_fr0g


People also ask

Can RabbitMQ have multiple consumers?

RabbitMQ has a plugin for consistent hash exchange. Using that exchange, and one consumer per queue, we can achieve message order with multiple consumers. The hash exchange distributes routing keys among queues, instead of messages among queues. This means all messages with the same routing key will go the same queue.

Is RabbitMQ multithreaded?

Use multiple queues and consumersQueues are single-threaded in RabbitMQ, and one queue can handle up to about 50 thousand messages. You will achieve better throughput on a multi-core system if you have multiple queues and consumers and if you have as many queues as cores on the underlying node(s).

How many consumers can RabbitMQ have?

Single active consumer allows to have only one consumer at a time consuming from a queue and to fail over to another registered consumer in case the active one is cancelled or dies. Consuming with only one consumer is useful when messages must be consumed and processed in the same order they arrive in the queue.


1 Answers

RabbitMQ's documentation is not very clear about this but, even though the ConsumerWorkService is using a thread pool, this pool doesn't seem to be used in a way to process messages in parallel:

Each Channel has its own dispatch thread. For the most common use case of one Consumer per Channel, this means Consumers do not hold up other Consumers. If you have multiple Consumers per Channel be aware that a long-running Consumer may hold up dispatch of callbacks to other Consumers on that Channel.

(http://www.rabbitmq.com/api-guide.html)

This documentation suggests using one Channel per thread and, in fact, if you simply create as many Channels as the required level of concurrency, messages will be dispatched between the consumers linked to these channels.

I've tested with 2 channels and consumers: when 2 messages are in the queue, each consumer only picks one message at a time. The blocks of 16 messages you mentioned don't seem to interfere, which is a good thing.

As a matter of fact, Spring AMQP also creates as several channels to process messages concurrently. This is done by:

  • setting SimpleMessageListenerContainer.setConcurrentConsumers(...): http://docs.spring.io/spring-amqp/docs/1.3.6.RELEASE/api/
  • and setting CachingConnectionFactory.setChannelCacheSize(...) accordingly: http://docs.spring.io/spring-amqp/docs/1.3.6.RELEASE/api/

I've also tested this to be working as expected.

like image 166
Alexis Seigneurin Avatar answered Sep 29 '22 11:09

Alexis Seigneurin