Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Queue remote calls to a Python Twisted perspective broker?

The strength of Twisted (for python) is its asynchronous framework (I think). I've written an image processing server that takes requests via Perspective Broker. It works great as long as I feed it less than a couple hundred images at a time. However, sometimes it gets spiked with hundreds of images at virtually the same time. Because it tries to process them all concurrently the server crashes.

As a solution I'd like to queue up the remote_calls on the server so that it only processes ~100 images at a time. It seems like this might be something that Twisted already does, but I can't seem to find it. Any ideas on how to start implementing this? A point in the right direction? Thanks!

like image 813
agartland Avatar asked May 18 '10 23:05

agartland


1 Answers

One ready-made option that might help with this is twisted.internet.defer.DeferredSemaphore. This is the asynchronous version of the normal (counting) semaphore you might already know if you've done much threaded programming.

A (counting) semaphore is a lot like a mutex (a lock). But where a mutex can only be acquired once until a corresponding release, a (counting) semaphore can be configured to allow an arbitrary (but specified) number of acquisitions to succeed before any corresponding releases are required.

Here's an example of using DeferredSemaphore to run ten asynchronous operations, but to run at most three of them at once:

from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def main():
    sem = DeferredSemaphore(3)

    jobs = []
    for i in range(10):
        jobs.append(sem.run(async, i))

    d = gatherResults(jobs)
    d.addCallback(lambda ignored: reactor.stop())
    reactor.run()


if __name__ == '__main__':
    main()

DeferredSemaphore also has explicit acquire and release methods, but the run method is so convenient it's almost always what you want. It calls the acquire method, which returns a Deferred. To that first Deferred, it adds a callback which calls the function you passed in (along with any positional or keyword arguments). If that function returns a Deferred, then to that second Deferred a callback is added which calls the release method.

The synchronous case is handled as well, by immediately calling release. Errors are also handled, by allowing them to propagate but making sure the necessary release is done to leave the DeferredSemaphore in a consistent state. The result of the function passed to run (or the result of the Deferred it returns) becomes the result of the Deferred returned by run.

Another possible approach might be based on DeferredQueue and cooperate. DeferredQueue is mostly like a normal queue, but its get method returns a Deferred. If there happen to be no items in the queue at the time of the call, the Deferred will not fire until an item is added.

Here's an example:

from random import randrange

from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def assign(jobs):
    # Create new jobs to be processed
    jobs.put(randrange(10))
    reactor.callLater(randrange(10), assign, jobs)


def worker(jobs):
    while True:
        yield jobs.get().addCallback(async)


def main():
    jobs = DeferredQueue()

    for i in range(10):
        jobs.put(i)

    assign(jobs)

    for i in range(3):
        cooperate(worker(jobs))

    reactor.run()


if __name__ == '__main__':
    main()

Note that the async worker function is the same as the one from the first example. However, this time, there's also a worker function which is explicitly pulling jobs out of the DeferredQueue and processing them with async (by adding async as a callback to the Deferred returned by get). The worker generator is driven by cooperate, which iterates it once after each Deferred it yields fires. The main loop, then, starts three of these worker generators so that three jobs will be in progress at any given time.

This approach involves a bit more code than the DeferredSemaphore approach, but has some benefits which may be interesting. First, cooperate returns a CooperativeTask instance which has useful methods like pause, resume, and a couple others. Also, all jobs assigned to the same cooperator will cooperate with each other in scheduling, so as not to overload the event loop (and this is what gives the API its name). On the DeferredQueue side, it's also possible to set limits on how many items are pending processing, so you can avoid completely overloading your server (for example, if your image processors get stuck and stop completing tasks). If the code calling put handles the queue overflow exception, you can use this as pressure to try to stop accepting new jobs (perhaps shunting them to another server, or alerting an administrator). Doing similar things with DeferredSemaphore is a bit trickier, since there's no way to limit how many jobs are waiting to be able to acquire the semaphore.

like image 119
Jean-Paul Calderone Avatar answered Nov 02 '22 03:11

Jean-Paul Calderone