I am using the following stack:
According Celery's documentation, running scheduled tasks on different queues should be as easy as defining the corresponding queues for the tasks on CELERY_ROUTES, nonetheless all tasks seem to be executed on Celery's default queue.
This is the configuration on my_app/settings.py:
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//" CELERY_ROUTES = { 'app1.tasks.*': {'queue': 'queue1'}, 'app2.tasks.*': {'queue': 'queue2'}, } CELERY_BEAT_SCHEDULE = { 'app1_test': { 'task': 'app1.tasks.app1_test', 'schedule': 15, }, 'app2_test': { 'task': 'app2.tasks.app2_test', 'schedule': 15, }, }
The tasks are just simple scripts for testing routing:
File app1/tasks.py:
from my_app.celery import app import time @app.task() def app1_test(): print('I am app1_test task!') time.sleep(10)
File app2/tasks.py:
from my_app.celery import app import time @app.task() def app2_test(): print('I am app2_test task!') time.sleep(10)
When I run Celery with all the required queues:
celery -A my_app worker -B -l info -Q celery,queue1,queue2
RabbitMQ will show that only the default queue "celery" is running the tasks:
sudo rabbitmqctl list_queues # Tasks executed by each queue: # - celery 2 # - queue1 0 # - queue2 0
Does somebody know how to fix this unexpected behavior?
Regards,
To answer your opening questions: As of version 2.0, Celery provides an easy way to start tasks from other tasks. What you are calling "secondary tasks" are what it calls "subtasks".
Here's how we can start a worker for our development needs. First, open a new shell or window. In that shell, set up the same Django development environment - activate your virtual environment, or add things to your Python path, whatever you do so that you could use runserver to run your project.
I have got it working, there are few things to note here:
According Celery's 4.2.0 documentation, CELERY_ROUTES
should be the variable to define queue routing, but it only works for me using CELERY_TASK_ROUTES
instead. The task routing seems to be independent from Celery Beat, therefore this will only work for tasks scheduled manually:
app1_test.delay() app2_test.delay()
or
app1_test.apply_async() app2_test.apply_async()
To make it work with Celery Beat, we just need to define the queues explicitly in the CELERY_BEAT_SCHEDULE
variable. The final setup of the file my_app/settings.py
would be as follows:
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//" CELERY_TASK_ROUTES = { 'app1.tasks.*': {'queue': 'queue1'}, 'app2.tasks.*': {'queue': 'queue2'}, } CELERY_BEAT_SCHEDULE = { 'app1_test': { 'task': 'app1.tasks.app1_test', 'schedule': 15, 'options': {'queue': 'queue1'} }, 'app2_test': { 'task': 'app2.tasks.app2_test', 'schedule': 15, 'options': {'queue': 'queue2'} }, }
And to run Celery listening on those two queues:
celery -A my_app worker -B -l INFO -Q queue1,queue2
Where
-A
: name of the project or app.-B
: Initiates the task scheduler Celery beat.-l
: Defines the logging level.-Q
: Defines the queues handled by this worker.I hope this saves some time to other developers.
adding queue
parameter to the decorator may help you,
@app.task(queue='queue1') def app1_test(): print('I am app1_test task!') time.sleep(10)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With