How do you limit the number of instances of a specific Celery task that can be ran simultaneously?
I have a task that processes large files. I'm running into a problem where a user may launch several tasks, causing the server to run out of CPU and memory as it tries to process too many files at once. I want to ensure that only N instances of this one type of task are ran at any given time, and that other tasks will sit queued in the scheduler until the others complete.
I see there's a rate_limit option in the task decorator, but I don't think this does what I want. If I'm understanding the docs correctly, this will just limit how quickly the tasks are launched, but it won't restrict the overall number of tasks running, so this will make my server will crash more slowly...but it will still crash nonetheless.
You probably just need to add the --concurrency or -c argument when starting the worker to spawn multiple (parallel) worker instances. Show activity on this post. You can look for Canvas primitives there you can see how to make groups for parallel execution. class celery.
There's no way to stop the worker with celery multi command. You can get PID from pid file or from ps output, e.g. What?! According to the celery multi help message, there is a celery multi kill worker1 command.
As for --concurrency celery by default uses multiprocessing to perform concurrent execution of tasks. The number of worker processes/threads can be changed using the --concurrency argument and defaults to the number of available CPU's if not set.
Celery worker is set to timeout after 10 seconds with soft timeout of 5 seconds. A delay is introduced to the celery task in order to cause a timeout. The timeout Exception is never raised by Celery worker.
You have to setup extra queue and set desired concurrency level for it. From Routing Tasks:
# Old config style
CELERY_ROUTES = {
'app.tasks.limited_task': {'queue': 'limited_queue'}
}
or
from kombu import Exchange, Queue
celery.conf.task_queues = (
Queue('default', default_exchange, routing_key='default'),
Queue('limited_queue', default_exchange, routing_key='limited_queue')
)
And start extra worker, serving only limited_queue:
$ celery -A celery_app worker -Q limited_queue --loglevel=info -c 1 -n limited_queue
Then you can check everything running smoothly using Flower or inspect command:
$ celery -A celery_app worker inspect --help
What you can do is to push these tasks to a specific queue and have X number of workers processing them. Having two workers on a queue with 100 items will ensure that there will only be two tasks processed at the same time.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With