Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python twisted: iterators and yields/inlineCallbacks

Tags:

python

twisted

Folks, Am thoroughly confused, so it's possible I am not even asking things correctly, but here goes:

I have a twisted application using inlineCallbacks. Now I need to define an iterator which will mean a generator is returned to the caller. However, the iterator cannot be inlineCallbacks decorated, can it be? If not, then how I do I code something like this.

Just to clarify: the goal is process_loop needs to be called every, say 5, seconds, it can process only ONE chunk of, say 10, and then it has to let go. However, to know that chunk of 10 (stored in cached, which is a dict of a dict), it needs to call a function that returns deferred.

@inlineCallbacks ### can\'t have inlineCallbacks here, right?
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield (call func here which returns deferred)
        if result is True:
            for k,v in cachedvalue.items():
                yield cachename, k, v

@inlineCallbacks
def process_chunk(myiter, num):
    try:
        for i in xrange(num):
            nextval = myiter.next()
            yield some_processing(nextval)
        returnValue(False)
    except StopIteration:
        returnValue(True)

@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    result = yield process_chunk(myiter, 10)
    if not result:
        print 'More left'
        reactor.callLater(5, process_loop, cached)
    else:
        print 'All done'
like image 471
helpmelearn Avatar asked May 12 '11 02:05

helpmelearn


2 Answers

You're right that you can't express what you want to express in cacheiter. The inlineCallbacks decorator won't let you have a function that returns an iterator. If you decorate a function with it, then the result is a function that always returns a Deferred. That's what it is for.

Part of what makes this difficult is that iterators don't work well with asynchronous code. If there's a Deferred involved in producing the elements of your iterator, then the elements that come out of your iterator are going to be Deferreds first.

You might do something like this to account for that:

@inlineCallbacks
def process_work():
    for element_deferred in some_jobs:
        element = yield element_deferred
        work_on(element)

This can work, but it looks particularly weird. Since generators can only yield to their caller (not, for example, to their caller's caller), the some_jobs iterator can't do anything about this; only code lexically within process_work can yield a Deferred to the inlineCallbacks-provided trampoline to wait on.

If you don't mind this pattern, then we could imaging your code being written something like:

from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor

class cacheiter(object):
    def __init__(self, cached):
        self._cached = iter(cached.items())
        self._remaining = []

    def __iter__(self):
        return self


    @inlineCallbacks
    def next(self):
        # First re-fill the list of synchronously-producable values if it is empty
        if not self._remaining:
            for name, value in self._cached:
                # Wait on this Deferred to determine if this cache item should be included
                if (yield check_condition(name, value)):
                    # If so, put all of its values into the value cache so the next one
                    # can be returned immediately next time this method is called.
                    self._remaining.extend([(name, k, v) for (k, v) in value.items()])

        # Now actually give out a value, if there is one.
        if self._remaining:
            returnValue(self._remaining.pop())

        # Otherwise the entire cache has been visited and the iterator is complete.
        # Sadly we cannot signal completion with StopIteration, because the iterator
        # protocol isn't going to add an errback to this Deferred and check for
        # StopIteration.  So signal completion with a simple None value.
        returnValue(None)


@inlineCallbacks
def process_chunk(myiter, num):
    for i in xrange(num):
        nextval = yield myiter.next()
        if nextval is None:
            # The iterator signaled completion via the special None value.
            # Processing is complete.
            returnValue(True)
        # Otherwise process the value.
        yield some_processing(nextval)

    # Indicate there is more processing to be done.
    returnValue(False)


def sleep(sec):
    # Simple helper to delay asynchronously for some number of seconds.
    return deferLater(reactor, sec, lambda: None)


@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    while True:
        # Loop processing 10 items from myiter at a time, until process_chunk signals
        # there are no values left.
        result = yield process_chunk(myiter, 10)
        if result:
            print 'All done'
            break

        print 'More left'
        # Insert the 5 second delay before starting on the next chunk.
        yield sleep(5)

d = process_loop(cached)

Another approach you might be able to take, though, is to use twisted.internet.task.cooperate. cooperate takes an iterator and consumes it, assuming that consuming it is potentially costly, and splitting up the job over multiple reactor iterations. Taking the definition of cacheiter from above:

from twisted.internet.task import cooperate

def process_loop(cached):
    finished = []

    def process_one(value):
        if value is None:
            finished.append(True)
        else:
            return some_processing(value)

    myiter = cacheiter(cached)

    while not finished:
        value_deferred = myiter.next()
        value_deferred.addCallback(process_one)
        yield value_deferred

task = cooperate(process_loop(cached))
d = task.whenDone()
like image 163
Jean-Paul Calderone Avatar answered Nov 10 '22 14:11

Jean-Paul Calderone


I think you're trying to do this:

@inlineCallbacks
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield some_deferred() # some deferred you'd like evaluated
        if result is True:
            # here you want to return something, so you have to use returnValue
            # the generator you want to return can be written as a generator expression
            gen = ((cachename, k, v) for k,v in cachedvalue.items())
            returnValue(gen)

When a genexp can't express what you're trying to return you can write a closure:

@inlineCallbacks
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield some_deferred()
        if result is True:
            # define the generator, saving the current values of the cache
            def gen(cachedvalue=cachedvalue, cachename=cachename):
                for k,v in cachedvalue.items():
                    yield cachename, k, v
            returnValue(gen()) # return it
like image 1
Jochen Ritzel Avatar answered Nov 10 '22 14:11

Jochen Ritzel