Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does a Celery worker consuming from multiple queues decide which to consume from first?

I am using Celery to perform asynchronous background tasks, with Redis as the backend. I'm interested in the behaviour of a Celery worker in the following situation:

I am running a worker as a daemon using celeryd. This worker has been assigned two queues to consume through the -Q option:

celeryd -E -Q queue1,queue2 

How does the worker decide where to fetch the next task to consume from? Does it randomly consume a task from either queue1 or queue2? Will it prioritise fetching from queue1 because it is first in the list of arguments passed to -Q?

like image 517
Jonathan Evans Avatar asked Jan 24 '13 11:01

Jonathan Evans


People also ask

How do you run multiple workers in Celery?

Another alternative is to give the worker process a unique name -- using the -n argument. I have two Pyramid apps running on the same physical hardware, each with its own celery instance(within their own virtualenvs). They both have Supervisor controlling both of them, both with a unique supervisord. conf file.

How does Celery define workers?

When you run a celery worker, it creates one parent process to manage the running tasks. This process handles the book keeping features like sending/receiving queue messages, registering tasks, killing hung tasks, tracking status, etc.

What is the default queue in Celery?

By default, Celery routes all tasks to a single queue and all workers consume this default queue.

What is concurrency in Celery?

As for --concurrency celery by default uses multiprocessing to perform concurrent execution of tasks. The number of worker processes/threads can be changed using the --concurrency argument and defaults to the number of available CPU's if not set.


2 Answers

From my testing, it processes multiple queues round-robin style.

If I use this test code:

from celery import task import time   @task def my_task(item_id):     time.sleep(0.5)     print('Processing item "%s"...' % item_id)   def add_items_to_queue(queue_name, items_count):     for i in xrange(0, items_count):         my_task.apply_async(('%s-%d' % (queue_name, i),), queue=queue_name)   add_items_to_queue('queue1', 10) add_items_to_queue('queue2', 10) add_items_to_queue('queue3', 5) 

And start the queue with (using django-celery):

`manage.py celery worker -Q queue1,queue2,queue3` 

It outputs:

Processing item "queue1-0"... Processing item "queue3-0"... Processing item "queue2-0"... Processing item "queue1-1"... Processing item "queue3-1"... Processing item "queue2-1"... Processing item "queue1-2"... Processing item "queue3-2"... Processing item "queue2-2"... Processing item "queue1-3"... Processing item "queue3-3"... Processing item "queue2-3"... Processing item "queue1-4"... Processing item "queue3-4"... Processing item "queue2-4"... Processing item "queue1-5"... Processing item "queue2-5"... Processing item "queue1-6"... Processing item "queue2-6"... Processing item "queue1-7"... Processing item "queue2-7"... Processing item "queue1-8"... Processing item "queue2-8"... Processing item "queue1-9"... Processing item "queue2-9"... 

So it pulls one item from each queue before going on to the next queue1 item even though ALL of the queue1 tasks were published before the queue2 & 3 tasks.

Note: As @WarLord pointed out, this exact behavior will only work when CELERYD_PREFETCH_MULTIPLIER is set to 1. If it's greater than 1, then that means items will be fetched from the queue in batches. So if you have 4 processes with the PREFETCH_MULTIPLIER set to 4, that means there will be 16 items pulled from the queue right off the bat, so you won't get the exact output as above, but it will still roughly follow round-robin.

like image 106
Troy Avatar answered Sep 22 '22 09:09

Troy


NOTE: This answer has deprecated: latest version of Celery works very differently then what it was in 2013...

A worker consuming several queues consumes task, FIFO order is maintained across multiple queues too.

Example:

Queue1 : (t1, t2, t5, t7)
Queue2 : (t0,t3,t4,t6)

Assuming 0-7 represents the order of the tasks published

order of Consumption is t0, t1, t2, t3, t4, t5, t6, t7

like image 43
Crazyshezy Avatar answered Sep 19 '22 09:09

Crazyshezy