I used to have a function like this
def calculate(self, input):
result = input * 2
if result > 4:
result_higher_then_four.send(result)
return result
Where result_higher_then_four
obviously represents a signal.
Then I introduced celery and my function looked like below and I never received a signal again. I suppose signals are bound per process and as celery runs in a different process, this means I cannot catch the signal in the main process. Should I use a thread_local
to fix this? Or am I overlooking the obvious?
Thanks
@task
def calculate(self, input):
result = input * 2
if result > 4:
result_higher_then_four.send(result)
return result
signals import task_prerun, task_postrun app = Celery('tasks', broker='redis://localhost:6379/0') @app. task def add(x, y): return x + y @task_prerun. connect(sender=add) def task_prerun_notifier(sender=None, **kwargs): print("From task_prerun_notifier ==> Running just before add() executes") @task_postrun.
Celery is a distributed task queue for UNIX systems. It allows you to offload work from your Python app. Once you integrate Celery into your app, you can send time-intensive tasks to Celery's task queue.
What are Celery Signatures ? Signatures are used when you want to send the signature of a task invocation to another process or as an argument to another function. A signature() wraps the args, kwargs, and execution options of a single task invocation.
If a task is revoked, the workers ignore the task and do not execute it. If you don't use persistent revokes your task can be executed after worker's restart. revoke has an terminate option which is False by default. If you need to kill the executing task you need to set terminate to True.
You can use the celeryd_init signal to initialize your workers and signals http://celery.readthedocs.org/en/latest/userguide/signals.html#celeryd-init
Based on what you provided, I've tested with:
from celery.signals import celeryd_init
from celery.utils.dispatch import Signal
def process_result(result, *args, **kwargs):
print "signals received: %s" % result
result_higher_then_four = Signal()
@celeryd_init.connect
def init_signals(*args, **kwargs):
result_higher_then_four.connect(process_result)
@task(bind=True)
def calculate(self, input):
result = input * 2
if result > 4:
result_higher_then_four.send(result=result, sender=self)
return result
The problem is that the signal receiver isn't getting registered. The celery workers run in their own process so the signal connections need to be made in that process. If you know what they are or can discover them, you can register them during task initialization using this technique.
Of course, that eliminates some of the benefit of using signals in the first place because you need to know the connections in advance.
One idea is to assume that the signal receivers will always register in the models module of each app. In which case the following will work.
class CalculateTask(celery.Task):
def __init__(self):
from django.conf import settings
for app in settings.INSTALLED_APPS:
app_models = '{}.{}'.format(app,'models')
__import__(app_models, globals=globals())
def run(self, input):
result = input * 2
if result > 4:
result_higher_then_four.send(result)
return result
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With