Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Check if in celery task

How to check that a function in executed by celery?

def notification():
   # in_celery() returns True if called from celery_test(), 
   #                     False if called from not_celery_test()
   if in_celery():
      # Send mail directly without creation of additional celery subtask
      ...
   else:
      # Send mail with creation of celery task
      ...

@celery.task()
def celery_test():
    notification()

def not_celery_test():
    notification()
like image 977
Peter Avatar asked Dec 24 '22 11:12

Peter


1 Answers

Here is one way to do it by using celery.current_task. Here is the code to be used by the task:

def notification():
    from celery import current_task
    if not current_task:
        print "directly called"
    elif current_task.request.id is None:
        print "called synchronously"
    else:
        print "dispatched"

@app.task
def notify():
    notification()

This is code you can run to exercise the above:

        from core.tasks import notify, notification
        print "DIRECT"
        notification()
        print "NOT DISPATCHED"
        notify()
        print "DISPATCHED"
        notify.delay().get()

My task code in the first snippet was in a module named core.tasks. And I shoved the code in the last snippet in a custom Django management command. This tests 3 cases:

  • Calling notification directly.

  • Calling notification through a task executed synchronously. That is, this task is not dispatched through Celery to a worker. The code of the task executes in the same process that calls notify.

  • Calling notification through a task run by a worker. The code of the task executes in a different process from the process that started it.

The output was:

NOT DISPATCHED
called synchronously
DISPATCHED
DIRECT
directly called

There is no line from the print in the task on the output after DISPATCHED because that line ends up in the worker log:

[2015-12-17 07:23:57,527: WARNING/Worker-4] dispatched

Important note: I initially was using if current_task is None in the first test but it did not work. I checked and rechecked. Somehow Celery sets current_task to an object which looks like None (if you use repr on it, you get None) but is not None. Unsure what is going on there. Using if not current_task works.

Also, I've tested the code above in a Django application but I've not used it in production. There may be gotchas I don't know.

like image 128
Louis Avatar answered Dec 28 '22 08:12

Louis