Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to route tasks to different queues with Celery and Django

Tags:

I am using the following stack:

  • Python 3.6
  • Celery v4.2.1 (Broker: RabbitMQ v3.6.0)
  • Django v2.0.4.

According Celery's documentation, running scheduled tasks on different queues should be as easy as defining the corresponding queues for the tasks on CELERY_ROUTES, nonetheless all tasks seem to be executed on Celery's default queue.

This is the configuration on my_app/settings.py:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//" CELERY_ROUTES = {  'app1.tasks.*': {'queue': 'queue1'},  'app2.tasks.*': {'queue': 'queue2'}, } CELERY_BEAT_SCHEDULE = {     'app1_test': {         'task': 'app1.tasks.app1_test',         'schedule': 15,     },     'app2_test': {         'task': 'app2.tasks.app2_test',         'schedule': 15,     },  } 

The tasks are just simple scripts for testing routing:

File app1/tasks.py:

from my_app.celery import app import time   @app.task() def app1_test():     print('I am app1_test task!')     time.sleep(10) 

File app2/tasks.py:

from my_app.celery import app import time   @app.task() def app2_test():     print('I am app2_test task!')     time.sleep(10) 

When I run Celery with all the required queues:

celery -A my_app worker -B -l info -Q celery,queue1,queue2 

RabbitMQ will show that only the default queue "celery" is running the tasks:

sudo rabbitmqctl list_queues # Tasks executed by each queue: #  - celery 2 #  - queue1 0 #  - queue2 0 

Does somebody know how to fix this unexpected behavior?

Regards,

like image 447
Ander Avatar asked Aug 01 '18 10:08

Ander


People also ask

Can a celery task call another task?

To answer your opening questions: As of version 2.0, Celery provides an easy way to start tasks from other tasks. What you are calling "secondary tasks" are what it calls "subtasks".

How do I schedule a task in Django Celery?

Here's how we can start a worker for our development needs. First, open a new shell or window. In that shell, set up the same Django development environment - activate your virtual environment, or add things to your Python path, whatever you do so that you could use runserver to run your project.


2 Answers

I have got it working, there are few things to note here:

According Celery's 4.2.0 documentation, CELERY_ROUTES should be the variable to define queue routing, but it only works for me using CELERY_TASK_ROUTES instead. The task routing seems to be independent from Celery Beat, therefore this will only work for tasks scheduled manually:

app1_test.delay() app2_test.delay() 

or

app1_test.apply_async() app2_test.apply_async() 

To make it work with Celery Beat, we just need to define the queues explicitly in the CELERY_BEAT_SCHEDULE variable. The final setup of the file my_app/settings.py would be as follows:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//" CELERY_TASK_ROUTES = {  'app1.tasks.*': {'queue': 'queue1'},  'app2.tasks.*': {'queue': 'queue2'}, } CELERY_BEAT_SCHEDULE = {     'app1_test': {         'task': 'app1.tasks.app1_test',         'schedule': 15,         'options': {'queue': 'queue1'}     },     'app2_test': {         'task': 'app2.tasks.app2_test',         'schedule': 15,         'options': {'queue': 'queue2'}     },  } 

And to run Celery listening on those two queues:

celery -A my_app worker -B -l INFO -Q queue1,queue2 

Where

  • -A: name of the project or app.
  • -B: Initiates the task scheduler Celery beat.
  • -l: Defines the logging level.
  • -Q: Defines the queues handled by this worker.

I hope this saves some time to other developers.

like image 158
Ander Avatar answered Sep 22 '22 19:09

Ander


adding queue parameter to the decorator may help you,

@app.task(queue='queue1') def app1_test():     print('I am app1_test task!')     time.sleep(10)
like image 28
JPG Avatar answered Sep 22 '22 19:09

JPG