Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In celery, what is the appropriate way to pass contextual metadata from sender process to worker when a task is enqueued?

When any celery task is enqueued I want to add contextual metadata the worker will be able to use.

The following code example works but I would like to have an appropriate celery-style solution.

from celery.signals import before_task_publish, task_prerun

@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
    task_kwags = body[1]
    metadata = {"foo": "bar"}
    task_kwags['__metadata__'] = metadata

@task_prerun.connect
def receiver_task_pre_run(task_id, task, *args, **kwargs):
    metadata = kwargs['kwargs'].pop('__metadata__', {})
    # metadata == {"foo": "bar"}
like image 221
jrobichaud Avatar asked Apr 13 '19 00:04

jrobichaud


People also ask

What is Sender in Celery?

This is the first signal sent when celery worker starts up. The sender is the host name of the worker, so this signal can be used to setup worker specific configuration: from celery.signals import celeryd_init @celeryd_init.

How does a Celery worker work?

Dedicated worker processes constantly monitor task queues for new work to perform. Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.

What is Apply_async in Celery?

apply_async(args[, kwargs[, …]]) Sends a task message. delay(*args, **kwargs) Shortcut to send a task message, but doesn't support execution options.

What is Celery in message queue?

Celery is an open source asynchronous task queue or job queue which is based on distributed message passing. While it supports scheduling, its focus is on operations in real time. Celery. Stable release. 5.2.3 / December 29, 2021.


1 Answers

When a task starts in the worker the content of before_task_publish's header is in the **kwargs of push_request.

celery/app/tasks.py:1000

    def push_request(self, *args, **kwargs):
        self.request_stack.push(Context(*args, **kwargs))

Something nice is done in the constructor of Context. self.__dict__.update() means we can access the values as Context(metadata={'foo': 'bar'}).metadata

celery/app/tasks.py:99

class Context(object)
# ...
    def __init__(self, *args, **kwargs):
        self.update(*args, **kwargs)

    def update(self, *args, **kwargs):
        return self.__dict__.update(*args, **kwargs)

The task context is accessible from Task's request property.

celery/app/tasks.py:1019

class Task(object):
# ...
    def _get_request(self):
        """Get current request object."""
        req = self.request_stack.top
        if req is None:
            # task was not called, but some may still expect a request
            # to be there, perhaps that should be deprecated.
            if self._default_request is None:
                self._default_request = Context()
            return self._default_request
        return req
    request = property(_get_request)

Which means the final solution is simply this:

from celery.signals import before_task_publish, task_prerun

@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
    metadata = {"foo": "bar"}
    headers['__metadata__'] = metadata

@task_prerun.connect
def receiver_task_pre_run(task_id, task, *args, **kwargs):
    metadata = getattr(task.request, '__metadata__', {}) 
    # metadata == {"foo": "bar"}

Note: task.request.__metadata__ would also work but it fails if a task was enqueued before the signals are integrated. Safer this way.

like image 122
jrobichaud Avatar answered Feb 28 '23 19:02

jrobichaud