Some of the tasks in my code were taking longer and longer to execute.
Upon inspection I noticed that although I have my worker node set to concurrency 6, and 6 processes exist to 'do work', but only 1 task is shown under 'running tasks'. Here is a little visual proof:
Here are the worker options:
And here is the task tab for that worker with only 1 running process:
I have found that if I restart celery, the concurrency is once again respected and i will see >1 running task, but after some amount of time/tasks it reverts back to this behavior..
Any ideas for fixing this intermittent problem?
As for --concurrency celery by default uses multiprocessing to perform concurrent execution of tasks. The number of worker processes/threads can be changed using the --concurrency argument and defaults to the number of available CPU's if not set.
celery -A yourproject. app inspect status will give the status of your workers. celery -A yourproject. app inspect active will give you list of tasks currently running, etc.
Time Limits The time limit is set in two values, soft and hard . The soft time limit allows the task to catch an exception to clean up before it is killed: the hard timeout isn't catch-able and force terminates the task.
Celery itself is using billiard (a multiprocessing fork) to run your tasks in separate processes.
I'm not sure if it's your use case, but I ran into similar problems when I had a mix of long and short tasks. Basically what happened is that at some point a process could start a very long running task, while prefetching a few other tasks, preventing them from being consumed by other processes. So I disabled the prefetching stuff, which is useful only if you're running a lot of short tasks.
To disable the prefetch, you need Celery 3.1+ and the Ofair setting, for instance:
celery -A proj worker -l info -Ofair
The docs mention here how to reserve one task at a time - or only as many as you have concurrency:
Often users ask if disabling “prefetching of tasks” is possible, but what they really mean by that, is to have a worker only reserve as many tasks as there are worker processes (10 unacknowledged tasks for -c 10)
That’s possible, but not without also enabling late acknowledgment. Using this option over the default behavior means a task that’s already started executing will be retried in the event of a power failure or the worker instance being killed abruptly, so this also means the task must be idempotent ... You can enable this behavior by using the following configuration options:
task_acks_late = True
worker_prefetch_multiplier = 1
or the code equivalent:
app = Celery(...)
app.conf.worker_prefetch_multiplier = 1
app.conf.task_acks_late = True
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With