I have some fairly busy celery queues, but not sure which tasks are the problematic ones. Is there a way to aggregate results to figure out which tasks are taking a long time? I have 10-20 workers on 2-4 servers.
Using redis as the broker and as the result backend as well. I noticed the busy queues on Flower, but can't figure out how to get time statistic aggregated per task.
Names. Every task must have a unique name. If no explicit name is provided the task decorator will generate one for you, and this name will be based on 1) the module the task is defined in, and 2) the name of the task function.
Now, open yet another terminal and enter command cd examples/queue-based-distribution/ and then enter celery -A celery-task-queue status as the basic monitoring command is status , which returns the state of the workers. The result of celery -A celery-task-queue status will be the status and the number of nodes online.
A task is just a Python function. You can think of scheduling a task as a time-delayed call to the function. For example, you might ask Celery to call your function task1 with arguments (1, 3, 3) after five minutes. Or you could have your function batchjob called every night at midnight.
Method 1:
If you have enabled logging when celery workers are started, they log time taken for each task.
$ celery worker -l info -A your_app --logfile celery.log
This will generate logs like this
[2016-06-04 13:21:30,749: INFO/MainProcess] Task sig.add[a8b648eb-9674-44f0-90bd-71cfebe22f2f] succeeded in 0.00979363399983s: 3
[2016-06-04 13:21:30,973: INFO/MainProcess] Received task: sig.add[7fd422e6-8f48-4dd2-90de-e213afbedc38]
[2016-06-04 13:21:30,982: WARNING/Worker-2] called by small_task. LOL {'signal': <Signal: Signal>, 'result': 3, 'sender': <@task: sig.add of tasks:0x7fdf33146c50>}
You can filter lines which have succeeded in
. Split these lines using ,
[
, :
as delimiters, print task name and time taken by each of it and then sort all the lines.
$ grep ' succeeded in ' celery.log | awk -F'[ :\[]' '{print $9, $13}' | sort
awk: warning: escape sequence `\[' treated as plain `['
sig.add 0.00775764500031s
sig.add 0.00802627899975s
sig.foo 12.00813863099938s
sig.foo 15.00871706100043s
sig.foo 12.00979363399983s
As you can see add
is very fast & foo
is slow.
Method 2:
Celery has task_prerun_handler
,task_postrun_handler
signals which run before/after task. You can hookup functions which will track time and then note the time somewhere.
from time import time
from celery.signals import task_prerun, task_postrun
tasks = {}
task_avg_time = {}
Average = namedtuple('Average', 'cum_avg count')
@task_prerun.connect
def task_prerun_handler(signal, sender, task_id, task, args, kwargs):
tasks[task_id] = time()
@task_postrun.connect
def task_postrun_handler(signal, sender, task_id, task, args, kwargs, retval, state):
try:
cost = time() - tasks.pop(task_id)
except KeyError:
cost = None
if not cost:
return
try:
cum_avg, count = task_avg_time[task.name]
new_count = count + 1
new_avg = ((cum_avg * count) + cost) / new_count
task_avg_time[task.name] = Average(new_avg, new_count)
except KeyError:
task_avg_time[task.name] = Average(cost, 1)
# write to redis: task_avg_time
References: https://stackoverflow.com/a/31731622/2698552
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With