When any celery task is enqueued I want to add contextual metadata the worker will be able to use.
The following code example works but I would like to have an appropriate celery-style solution.
from celery.signals import before_task_publish, task_prerun
@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
task_kwags = body[1]
metadata = {"foo": "bar"}
task_kwags['__metadata__'] = metadata
@task_prerun.connect
def receiver_task_pre_run(task_id, task, *args, **kwargs):
metadata = kwargs['kwargs'].pop('__metadata__', {})
# metadata == {"foo": "bar"}
This is the first signal sent when celery worker starts up. The sender is the host name of the worker, so this signal can be used to setup worker specific configuration: from celery.signals import celeryd_init @celeryd_init.
Dedicated worker processes constantly monitor task queues for new work to perform. Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.
apply_async(args[, kwargs[, …]]) Sends a task message. delay(*args, **kwargs) Shortcut to send a task message, but doesn't support execution options.
Celery is an open source asynchronous task queue or job queue which is based on distributed message passing. While it supports scheduling, its focus is on operations in real time. Celery. Stable release. 5.2.3 / December 29, 2021.
When a task starts in the worker the content of before_task_publish
's header
is in the **kwargs
of push_request
.
celery/app/tasks.py:1000
def push_request(self, *args, **kwargs):
self.request_stack.push(Context(*args, **kwargs))
Something nice is done in the constructor of Context
.
self.__dict__.update()
means we can access the values as Context(metadata={'foo': 'bar'}).metadata
celery/app/tasks.py:99
class Context(object)
# ...
def __init__(self, *args, **kwargs):
self.update(*args, **kwargs)
def update(self, *args, **kwargs):
return self.__dict__.update(*args, **kwargs)
The task context is accessible from Task
's request
property.
celery/app/tasks.py:1019
class Task(object):
# ...
def _get_request(self):
"""Get current request object."""
req = self.request_stack.top
if req is None:
# task was not called, but some may still expect a request
# to be there, perhaps that should be deprecated.
if self._default_request is None:
self._default_request = Context()
return self._default_request
return req
request = property(_get_request)
Which means the final solution is simply this:
from celery.signals import before_task_publish, task_prerun
@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
metadata = {"foo": "bar"}
headers['__metadata__'] = metadata
@task_prerun.connect
def receiver_task_pre_run(task_id, task, *args, **kwargs):
metadata = getattr(task.request, '__metadata__', {})
# metadata == {"foo": "bar"}
Note: task.request.__metadata__
would also work but it fails if a task was enqueued before the signals are integrated. Safer this way.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With