Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Twisted: Waiting for subtasks to finish

In my code, I have two hypothetical tasks: one gets urls from a generator and batch downloads them using Twisted's Cooperator, and the other takes a downloaded source and asynchronously parses it. I'm trying to encapsulate all of the fetch and parse tasks into a single Deferred object that calls back when all pages are downloaded and all sources are parsed.

I've come up with the following solution:

from twisted.internet import defer, task, reactor, threads
from twisted.web.client import getPage


BATCH_SIZE = 5

def main_task():
    result = defer.Deferred()
    state = {'count': 0, 'done': False}

    def on_parse_finish(r):
        state['count'] -= 1
        if state['done'] and state['count'] == 0:
            result.callback(True)

    def process(source):
        deferred = parse(source)
        state['count'] += 1
        deferred.addCallback(on_parse_finish)

    def fetch_urls():
        for url in get_urls():
            deferred = getPage(url)
            deferred.addCallback(process)
            yield deferred

    def on_finish(r):
        state['done'] = True

    deferreds = []

    coop = task.Cooperator()
    urls = fetch_urls()
    for _ in xrange(BATCH_SIZE):
        deferreds.append(coop.coiterate(urls))

    main_tasks = defer.DeferredList(deferreds)
    main_tasks.addCallback(on_finish)

    return defer.DeferredList([main_tasks, result])

# `main_task` is meant to be used with `blockingCallFromThread`
# The following should block until all fetch/parse tasks are completed:
# threads.blockingCallFromThread(reactor, main_task)

The code works, but I feel as if I'm either missing something blatantly obvious, or ignorant of a simple Twisted pattern that would make this a lot simpler. Is there a better way to return a single Deferred that calls back when all fetching and parsing is finished?

like image 887
enderskill Avatar asked Dec 02 '13 19:12

enderskill


1 Answers

As currently written, it looks to me like this code will have a limited number of parallel downloads, but an unlimited number of parallel parse jobs. Is that intentional? I'm going to assume "no", since if your network happens to be fast and your parser happens to be slow, as the number of URLs approaches infinity, so does your memory usage :).

So here's a thing that will have limited parallelism but carry out parses sequentially with downloads, instead:

from twisted.internet import defer, task
from twisted.web.client import getPage

BATCH_SIZE = 5

def main_task(reactor):
    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parse)

    coop = task.Cooperator()
    urls = fetch_urls()

    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(BATCH_SIZE)])
            .addCallback(task_finished))

task.react(main_task)

This works because since parse (apparently) returns a Deferred, adding it as a callback to the one returned by getPage results in a Deferred that won't call the callback added by coiterate until parse has done its business.

Since you were asking about idiomatic Twisted code, I've also taken the liberty of modernizing it a bit (using task.react rather than running the reactor manually, inlining expressions to make things briefer and so on).

If you really do want to have more parallel parses than parallel fetches, something like this might work better then:

from twisted.internet import defer, task
from twisted.web.client import getPage

PARALLEL_FETCHES = 5
PARALLEL_PARSES = 10

def main_task(reactor):
    parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES)

    def parseWhenReady(r):
        def parallelParse(_):
            parse(r).addBoth(
                lambda result: parseSemaphore.release().addCallback(
                    lambda _: result
                )
            )
        return parseSemaphore.acquire().addCallback(parallelParse)

    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parseWhenReady)

    coop = task.Cooperator()
    urls = fetch_urls()

    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(PARALLEL_FETCHES)])
            .addCallback(lambda done:
                         defer.DeferredList(
                            [parseSemaphore.acquire()
                             for _ in xrange(PARALLEL_PARSES)]
                         ))
            .addCallback(task_finished))

task.react(main_task)

You can see that parseWhenReady returns the Deferred returned from acquire, so parallel fetching will continue as soon as the parallel parse can begin, and therefore you won't continue fetching indiscriminately even when the parser is overloaded. However, parallelParse carefully abstains from returning the Deferred returned by parse or release, since fetching should be able continue as those are proceeding.

(Please note that since your initial example was not runnable, I haven't tested either of these at all. Hopefully the intent is clear even if there are bugs.)

like image 129
Glyph Avatar answered Oct 04 '22 04:10

Glyph