Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to route a chain of tasks to a specific queue in celery?

When I route a task to a particular queue it works:

task.apply_async(queue='beetroot')

But if I create a chain:

chain = task | task

And then I write

chain.apply_async(queue='beetroot')

It seems to ignore the queue keyword and assigns to the default 'celery' queue.

It would be nice if celery supported routing in chains - all tasks executed sequentially in the same queue.

like image 760
mpaf Avatar asked Feb 19 '13 09:02

mpaf


People also ask

How does Celery task queue work?

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.

What is Celery distributed task queue?

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It's a task queue with focus on real-time processing, while also supporting task scheduling.

Is Celery a task queue?

You can integrate Celery to help with that. Celery is a distributed task queue for UNIX systems. It allows you to offload work from your Python app. Once you integrate Celery into your app, you can send time-intensive tasks to Celery's task queue.


2 Answers

I do it like this:

subtask = task.s(*myargs, **mykwargs).set(queue=myqueue)
mychain = celery.chain(subtask, subtask2, ...)
mychain.apply_async()
like image 156
bigjools Avatar answered Sep 28 '22 09:09

bigjools


Ok I got this one figured out.

You have to add the required execution options like queue= or countdown= to the subtask definition, or through a partial:

subtask definition:

from celery import subtask

chain = subtask('task', queue = 'beetroot') | subtask('task', queue = 'beetroot')

partial:

chain = task.s().apply_async(queue = 'beetroot') | task.s().apply_async(queue = 'beetroot')

Then you execute the chain through:

chain.apply_async()

or,

chain.delay()

And the tasks will be sent to the 'beetroot' queue. Extra execution arguments in this last command will not do anything. It would have been kind of nice to apply all of those execution arguments at the Chain (or Group, or any other Canvas primitives) level.

like image 36
mpaf Avatar answered Sep 28 '22 10:09

mpaf