Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

chain two remote tasks in celery by send_task

Tags:

python

celery

In celery I want to chain two tasks from remote worker together. Can somebody tell me how to specify it in send_task? Or is there any other way of calling remote tasks?

BR

like image 227
mehdi Avatar asked Jul 06 '15 06:07

mehdi


1 Answers

I was attempting to do the same thing. I couldn't find any built-in functionality to generate a task purely by its name, however it wasn't difficult to add such tooling:

from celery import Task as BaseTask

class Task(BaseTask):
    def __init__(self, name, *args, **kwargs):
        super(BaseTask, self).__init__(*args, **kwargs)
        self.name = name

With this class, you can then do things like:

(
    Task('worker.hello').s('world') | 
    Task('messaging.email-results').s(email_address='[email protected]')
)()

Or, alternatively:

app.send_task(
    'worker.hello', ['world'], 
    chain=[
        Task('messaging.email-results').s(email_address='[email protected]')
    ]
)

EDIT:

Disregard above, I've since realized that the correct way to do this is by using the Signature class (as mentioned by @Chrismit below):

from celery import Signature

(
    Signature('worker.hello', args=['world']) | 
    Signature('messaging.email-results', kwargs={'email_address':'[email protected]'})
)()

Or, alternatively:

from celery import Signature

app.send_task(
    'worker.hello', ['world'], 
    chain=[
        Signature('messaging.email-results', kwargs={'email_address': '[email protected]'})
    ]
)

An important note: Any tasks after the first task in a chain are not actually scheduled until a worker processes the task before it (this makes sense, as we don't know the input for later tasks until the previous task is run). The subsequent tasks are scheduled on the codebase of the worker. For this reason, you need to ensure that one of the following are true:

  • each of your workers have knowledge of the task_routes so that it can place the followup tasks in the appropriate queue (e.g. in my example, it should know that tasks starting with messaging.* should go in the 'messaging' queue)
  • you have encoded the correct queue into each Signature class when you create the chain. Celery already has tooling to derive a queue name from the task name which can be leaned on:

    def get_queue_name(task_name):
        return app.amqp.router.route({}, task_name)['queue'].name
    
    (
        Signature('worker.hello', args=['world']) | 
        Signature(
            'messaging.email-results',
            kwargs={'email_address':'[email protected]'},
            queue=get_queue_name('messaging.email-results')  # If typing the task name twice annoys you, you could subclass Signature to do this automatically
        )
    )()
    

    (I think this is the cleanest solution, as it allows workers to not know about each other)

  • all tasks are executed in the default queue. If you don't have any task_routes declared on the worker and there is no queue specified in the task's signature, then Celery will schedule that task in the worker's default_queue. Unless customized, that is 'celery'. I strongly recommend against doing this as it isn't very explicit and doesn't allow for much queue management, but nonetheless it is an option.
like image 152
alukach Avatar answered Sep 17 '22 17:09

alukach