Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Chain a celery task's results into a distributed group

Like in this other question, I want to create a celery group from a list that's returned by a celery task. The idea is that the first task will return a list, and the second task will explode that list into concurrent tasks for every item in the list.

The plan is to use this while downloading content. The first task gets links from a website, and the second task is a chain that downloads the page, processes it, and then uploads it to s3. Finally, once all the subpages are done, the website is marked as done in our DB. Something like:

chain(
    get_links_from_website.si('https://www.google.com'),
    dmap.s(  # <-- Distributed map
        download_sub_page.s() | 
        process_sub_page.s() | 
        upload_sub_page_to_s3.s()
    ),
    mark_website_done.s()
)

The solution I've seen so far seems to do an adequate job of this, but fails when the second task is a chain, due to issues with clone not doing a deepcopy (see the comments on this answer for details):

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()

It also has the problem that if the iterable is 10,000 items long, it will create a group with 10,000 items. That is blowing up our memory usage, as you can imagine.

So, what I'm looking for is a way to do dmap that:

  • Doesn't blow up RAM by creating monstrous groups (maybe there's a way to chunk through the iterable?)
  • Works on celery chains without issues with deepcopy.
like image 592
mlissner Avatar asked Mar 29 '17 23:03

mlissner


People also ask

What is distributed in celery?

Celery is a distributed task queue written in Python, which works using distributed messages. Each execution unit in celery is called a task. A task can be executed concurrently on one or more servers using processes called workers.

Does celery use multiprocessing?

I notice that Celery uses multiprocessing by default. I don't like that because it makes it harder to use my debugger, and I don't have any CPU-intensive tasks.

Is celery multi threaded?

So Celery (and other queue frameworks) has other benefits as well - Think of it as a 'task/function manager' rather then just a way of multithreading.


2 Answers

celery canvas provides chunks to split a task into chunks. Unfortunately, this won't work with primitives like chain, group.

You can use celery signals to prevent issues with dmap/clone.

ch = chain(
    download_sub_page.s(),
    process_sub_page.s(),
    upload_sub_page.s(),
)

@task_success.connect(sender='get_links_from_website')
def task_success_handler(sender=None, headers=None, body=None, **kwargs):
    result = kwargs['result']    
    header = [ch(i) for i in result]
    callback = mark_website_done.si()
    chord(header)(callback)

Create a chain for processing pages and hook the last task to it using a chord. This function gets executed whenever get_links_from_website runs succcessfully.

Depending on the time taken by chain, you can also save results of get_links_from_website somewhere. Then iterate over a batch of them to queue up chains and with the last batch, you can hook a callback to last task.

like image 188
Pandikunta Anand Reddy Avatar answered Sep 29 '22 04:09

Pandikunta Anand Reddy


This is a bit hacky but we're using deepcopy to clone the callback, this fixes the bug with Signature's shallow copy

def dmap(it, callback, final=None):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)

    run_in_parallel = group(subtask(copy.deepcopy(dict(callback))).clone([arg, ]) for arg in it)

    if len(run_in_parallel.tasks) == 0:
        return []

    if final:
        return chord(run_in_parallel)(final)

    return run_in_parallel.delay()

Note that this will only work for one nesting level (i.e. callback is a chain/group/chord) but will not work for deeply nested callbacks

For deeply nested callback graphs we use this hack which is a bit slower but works flawlessly

# Hack to completely clone a signature with possibly complex subtasks (chains, chords, etc...)
run_in_parallel = group(pickle.loads(pickle.dumps(callback)).clone([arg, ]) for arg in it)

And for the size of the groups you can always split the iterator to chunks

like image 27
Jether Avatar answered Sep 29 '22 03:09

Jether