Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery and signals

I used to have a function like this

def calculate(self, input):
    result = input * 2

    if result > 4:
        result_higher_then_four.send(result)

    return result

Where result_higher_then_four obviously represents a signal.

Then I introduced celery and my function looked like below and I never received a signal again. I suppose signals are bound per process and as celery runs in a different process, this means I cannot catch the signal in the main process. Should I use a thread_local to fix this? Or am I overlooking the obvious?

Thanks

@task
def calculate(self, input):
    result = input * 2

    if result > 4:
        result_higher_then_four.send(result)

    return result
like image 548
user2298943 Avatar asked Sep 03 '14 14:09

user2298943


People also ask

How do you use celery signals?

signals import task_prerun, task_postrun app = Celery('tasks', broker='redis://localhost:6379/0') @app. task def add(x, y): return x + y @task_prerun. connect(sender=add) def task_prerun_notifier(sender=None, **kwargs): print("From task_prerun_notifier ==> Running just before add() executes") @task_postrun.

What is celery and how it works?

Celery is a distributed task queue for UNIX systems. It allows you to offload work from your Python app. Once you integrate Celery into your app, you can send time-intensive tasks to Celery's task queue.

What is celery signature?

What are Celery Signatures ? Signatures are used when you want to send the signature of a task invocation to another process or as an argument to another function. A signature() wraps the args, kwargs, and execution options of a single task invocation.

How do I stop celery task?

If a task is revoked, the workers ignore the task and do not execute it. If you don't use persistent revokes your task can be executed after worker's restart. revoke has an terminate option which is False by default. If you need to kill the executing task you need to set terminate to True.


2 Answers

You can use the celeryd_init signal to initialize your workers and signals http://celery.readthedocs.org/en/latest/userguide/signals.html#celeryd-init

Based on what you provided, I've tested with:

from celery.signals import celeryd_init
from celery.utils.dispatch import Signal

def process_result(result, *args, **kwargs):
    print "signals received: %s" % result

result_higher_then_four = Signal()

@celeryd_init.connect
def init_signals(*args, **kwargs):
    result_higher_then_four.connect(process_result)

@task(bind=True)
def calculate(self, input):
    result = input * 2

    if result > 4:
       result_higher_then_four.send(result=result, sender=self)

    return result
like image 138
ant31 Avatar answered Sep 29 '22 23:09

ant31


The problem is that the signal receiver isn't getting registered. The celery workers run in their own process so the signal connections need to be made in that process. If you know what they are or can discover them, you can register them during task initialization using this technique.

Of course, that eliminates some of the benefit of using signals in the first place because you need to know the connections in advance.

One idea is to assume that the signal receivers will always register in the models module of each app. In which case the following will work.

class CalculateTask(celery.Task):

    def __init__(self):
        from django.conf import settings
        for app in settings.INSTALLED_APPS:
            app_models = '{}.{}'.format(app,'models') 
            __import__(app_models, globals=globals())                                 

    def run(self, input):
        result = input * 2
        if result > 4:
            result_higher_then_four.send(result)

        return result
like image 28
joshua Avatar answered Sep 29 '22 21:09

joshua