I'm trying to keep multiple celery queues with different tasks and workers in the same redis database. Really just a convenience issue of only wanting one redis server rather than two on my machine.
I followed the celery tutorial docs verbatim, as it as the only way to get it to work for me. Now when I try to duplicate everything with slightly tweaked names/queues, it keeps erroring out.
Note - I'm a newish to Python and Celery, which is obviously part of the problem. I'm not sure which parts are named "task/tasks" as a name vs special words.
My condensed version of docs: Run celery -A tasks worker
to spawn the workers. tasks.py contains task code with celery = Celery('tasks', broker='redis://localhost')
to connect to Celery and @task()
above my functions that I want to delay.
Within my program for queueing tasks...
from tasks import do_work do_work.delay()
So given all of the above, what are the steps I need to take to turn this into two types of tasks that run independently on separate queues and workers? For example, blue_tasks and red_tasks?
I've tried changing all instances of tasks to blue_tasks or red_tasks. However, when I queue blue_tasks, the red_tasks workers I've started up start trying to work on them.
I read about default queues and such, so I tried this code, which didn't work:
CELERY_DEFAULT_QUEUE = 'red' CELERY_QUEUES = ( Queue('red', Exchange('red'), routing_key='red'), )
As a side note, I don't understand why celery worker
errors out with celery attempting to connect to a default amqp instance, while celery -A tasks worker
tells celery to connect to Redis. What task code is celery worker
attempting to run on the worker if nothing has been specified?
You probably just need to add the --concurrency or -c argument when starting the worker to spawn multiple (parallel) worker instances. Show activity on this post. You can look for Canvas primitives there you can see how to make groups for parallel execution. class celery.
Demonstration of a task which runs a startup task, then parallelizes multiple worker tasks, and then fires-off a reducer task. If passing results around would be important, then could use a chord instead for task2 and task3 .
Celery itself is using billiard (a multiprocessing fork) to run your tasks in separate processes.
Concurrency settings The default is 4 (four messages for each process). The default setting seems pretty good here. However, if you have very long running tasks waiting in the queue and you have to start the workers, note that the first worker to start will receive four times the number of messages initially.
By default everything goes into a default queue named celery
(and this is what celery worker
will process if no queue is specified)
So say you have your do_work
task function in django_project_root/myapp/tasks.py
.
You could configure the do_work
task to live in it's own queue like so:
CELERY_ROUTES = { 'myproject.tasks.do_work': {'queue': 'red'}, }
Then run a worker using celery worker -Q red
and it will only process things in that queue (another worker invoked with celery worker
will only pickup things in the default queue)
The task routing section in the documentation should explain all.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With