Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best way to map a generated list to a task in celery

I am looking for some advice as to the best way to map a list generated from a task to another task in celery.

Let's say I have a task called parse, which parses a PDF document and outputs a list of pages. Each page then needs to be individually passed to another task called feed. This all needs to go inside a task called process

So, one way I could do that is this:

@celery.task
def process:
    pages = parse.s(path_to_pdf).get()

    feed.map(pages)

Of course, that is not a good idea because I am calling get() inside a task.

Additionally this is inefficient, since my parse task is wrapped around a generator function and is able to yield pages, which means that it should be possible to queue the first page for feeding before the last page has been yielded by the parser.

Another possibility is to do this:

@celery.task
def process:
    for page in parse.s(path_to_pdf).get():
        feed.delay(page)

That example still involves calling get() inside a task though. Additionally, this example is an oversimplification and I really need to do some things after all pages have been fed (i.e. in a chord).

I am looking for the most optimal way to do this in celery. I would appreciate any advice.

Thanks!

like image 221
chaimp Avatar asked Apr 30 '13 03:04

chaimp


1 Answers

This is probably far too late to be of use to you, but you probably want to use a task chain:

@celery.task
def process():
    return chain(parse.s(), feed_map.s())

@celery.task
def feed_map(pages):
    return feed.map(pages)

if you have some final task, say final, you could do this:

@celery.task
def feed_map(pages):
    return chord(feed.map.s(page) for page in pages, final.s)
like image 183
theheadofabroom Avatar answered Sep 28 '22 16:09

theheadofabroom