Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery chunks inside chain

I would like to use chunks inside celery chain command.

chain = task1.s(arg1) | task2.chunks(?,CHUNK_SIZE) | task3.chunks(?, CHUNK_SIZE)

Basicly what I would like to do is run task1, chunk it's results and send chunks to task2 which should then call task3 which should also receive chunked results from task2 to finish the process. Why? Because task1 and task2 can both return fair amount of items which I would like to process in more batches.

The code above does not work since I'm not quite sure what to put instead of question marks in order to make it work.

I'm not quite sure this is even possible since searching didn't provide much results, so in the case it's not possible to construct such workflow I'd be interested into reasonable alternatives.

like image 539
user962563 Avatar asked Sep 13 '13 10:09

user962563


1 Answers

I'm not sure whether this is even possible with the exiting primitives.

I can think if two alternatives/work-arounds:

  1. Use chunks/chords to initiate new tasks from within a task.

    You must have already thought of this one. The idea is to call task1 normally with apply_async. Once that task is done generating the massive output that needs chunking, simply use the chunks primitive to further create chunks for task2. Similarly, perform the same step for the transition between task2 and task3. Calling tasks from within tasks is only a bad idea when you end up waiting to fetch the results of the inner task. So remember if you are waiting for the task results, then this would not be a recommended approach.

    @task
    def task1(some_input):
        # Do stuff
        # Create a list of lists where the inner list represent the *args to send to an individual task
        task2.chunks([[i, j], [i, j], [i, j]], CHUNK_SIZE).apply_async()
    
    @task
    def task2(a, b):
        # Do stuff
        # Create a list of lists where the inner list represent the *args to send to an individual task
        task3.chunks([[i, j], [i, j], [i, j]], CHUNK_SIZE).apply_async()
    
    @task
    def task3(a, b):
        # Do stuff
    
  2. This solution is a little bit interesting. I came across a particular request on the celery Github issues page. Check out this pull request from steeve: https://github.com/celery/celery/pull/817 From what I've understood, he's created a dynamic task decorator (theres a debate about whether the name should be that), which understands if a task returns a subtask. If so, it applies that subtask first. He claims that he is using it successfully at Veezio in production. I haven't tried it out myself. I'd suggest heading over to that thread and asking a few questions. Or even bugging Steeve about it over at Twitter or an IRC or something.

like image 153
arijeet Avatar answered Oct 13 '22 22:10

arijeet