Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery Task Chain and Accessing **kwargs

I have a situation similar to the one outlined here, except that instead of chaining tasks with multiple arguments, I want to chain tasks that return a dictionary with multiple entries.

This is -- very loosely and abstractly --- what I'm trying to do:

tasks.py

@task()
def task1(item1=None, item2=None):
  item3 = #do some stuff with item1 and item2 to yield item3
  return_object = dict(item1=item1, item2=item2, item3=item3)
  return return_object

def task2(item1=None, item2=None, item3=None):
  item4 = #do something with item1, item2, item3 to yield item4
  return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4)
  return return_object

Working from ipython, I am able to call task1 individually and asynchronously, with no problems.

I can ALSO call task2 individually with the result returned by task1 as a double-star argument:

>>res1 = task1.s(item1=something, item2=something_else).apply_async()
>>res1.status
'SUCCESS'
>>res2 = task2.s(**res1.result).apply_async()
>>res2.status
'SUCCESS

However, what I ultimately want achieve is the same end result as above, but via a chain, and here, I can't figure out how to have task2 instantiated not with (positional) arguments returned by task1, but with task1.result as **kwargs:

chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async()  #THIS DOESN'T WORK!

I suspect that I can go back and rewrite my tasks so that they return positional arguments instead of a dictionary, and this may clear things up, but it seems to me that there should be some way to access task1's return object in task2 with the equivalent functionality of the **double star. I also suspect that I'm missing something here fairly obvious about either Celery subtask implementation or *args vs. **kwargs.

Hope this makes sense. And thanks in advance for any tips.

like image 743
Benjamin White Avatar asked Feb 19 '13 21:02

Benjamin White


1 Answers

This is my take at the problem, using an abstract task class:

from __future__ import absolute_import
from celery import Task
from myapp.tasks.celery import app   


class ChainedTask(Task):
    abstract = True    

    def __call__(self, *args, **kwargs):
        if len(args) == 1 and isinstance(args[0], dict):
            kwargs.update(args[0])
            args = ()
        return super(ChainedTask, self).__call__(*args, **kwargs)

@app.task(base=ChainedTask)
def task1(x, y):
    return {'x': x * 2, 'y': y * 2, 'z': x * y}    


@app.task(base=ChainedTask)
def task2(x, y, z):
    return {'x': x * 3, 'y': y * 3, 'z': z * 2}

You can now define and execute your chain as such:

from celery import chain

pipe = chain(task1.s(x=1, y=2) | task2.s())
pipe.apply_async()
like image 123
Balthazar Rouberol Avatar answered Sep 24 '22 03:09

Balthazar Rouberol