In celery I want to chain two tasks from remote worker together. Can somebody tell me how to specify it in send_task? Or is there any other way of calling remote tasks?
BR
I was attempting to do the same thing. I couldn't find any built-in functionality to generate a task purely by its name, however it wasn't difficult to add such tooling:
from celery import Task as BaseTask
class Task(BaseTask):
def __init__(self, name, *args, **kwargs):
super(BaseTask, self).__init__(*args, **kwargs)
self.name = name
With this class, you can then do things like:
(
Task('worker.hello').s('world') |
Task('messaging.email-results').s(email_address='[email protected]')
)()
Or, alternatively:
app.send_task(
'worker.hello', ['world'],
chain=[
Task('messaging.email-results').s(email_address='[email protected]')
]
)
EDIT:
Disregard above, I've since realized that the correct way to do this is by using the Signature
class (as mentioned by @Chrismit below):
from celery import Signature
(
Signature('worker.hello', args=['world']) |
Signature('messaging.email-results', kwargs={'email_address':'[email protected]'})
)()
Or, alternatively:
from celery import Signature
app.send_task(
'worker.hello', ['world'],
chain=[
Signature('messaging.email-results', kwargs={'email_address': '[email protected]'})
]
)
An important note: Any tasks after the first task in a chain are not actually scheduled until a worker processes the task before it (this makes sense, as we don't know the input for later tasks until the previous task is run). The subsequent tasks are scheduled on the codebase of the worker. For this reason, you need to ensure that one of the following are true:
task_routes
so that it can place the followup tasks in the appropriate queue (e.g. in my example, it should know that tasks starting with messaging.*
should go in the 'messaging'
queue)you have encoded the correct queue
into each Signature
class when you create the chain. Celery already has tooling to derive a queue name from the task name which can be leaned on:
def get_queue_name(task_name):
return app.amqp.router.route({}, task_name)['queue'].name
(
Signature('worker.hello', args=['world']) |
Signature(
'messaging.email-results',
kwargs={'email_address':'[email protected]'},
queue=get_queue_name('messaging.email-results') # If typing the task name twice annoys you, you could subclass Signature to do this automatically
)
)()
(I think this is the cleanest solution, as it allows workers to not know about each other)
task_routes
declared on the worker and there is no queue
specified in the task's signature, then Celery will schedule that task in the worker's default_queue
. Unless customized, that is 'celery'
. I strongly recommend against doing this as it isn't very explicit and doesn't allow for much queue management, but nonetheless it is an option.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With