Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the best way to refactor generators pipeline as coroutines?

Consider this code:

#!/usr/bin/env python
# coding=utf-8
from string import letters


def filter_upper(letters):
    for letter in letters:
        if letter.isupper():
            yield letter

def filter_selected(letters, selected):
    selected = set(map(str.lower, selected))
    for letter in letters:
        if letter.lower() in selected:
            yield letter


def main():
    stuff = filter_selected(filter_upper(letters), ['a', 'b', 'c'])
    print(list(stuff))

main()

This is the illustration of a pipeline constructed from generators. I often use this pattern in practice to build data processing flow. It's like UNIX pipes.

What is the most elegant way to refactor the generators to coroutines that suspend execution every yield?

UPDATE

My first try was like this:

#!/usr/bin/env python
# coding=utf-8

import asyncio

@asyncio.coroutine
def coro():
    for e in ['a', 'b', 'c']:
        future = asyncio.Future()
        future.set_result(e)
        yield from future


@asyncio.coroutine
def coro2():
    a = yield from coro()
    print(a)


loop = asyncio.get_event_loop()
loop.run_until_complete(coro2())

But for some reason it doesnt work - variable a becomes None.

UPDATE #1

What I came up with recently:

Server:

#!/usr/bin/env python
# coding=utf-8
"""Server that accepts a client and send it strings from user input."""


import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = ''
port = 5555

s.bind((host, port))
s.listen(1)

print('Listening...')

conn, addr = s.accept()

print('Client ({}) connected.'.format(addr))

while True:
    conn.send(raw_input('Enter data to send: '))

Client:

#!/usr/bin/env python
# coding=utf-8
"""Client that demonstrates processing pipeline."""

import trollius as asyncio
from trollius import From


@asyncio.coroutine
def upper(input, output):
    while True:
        char = yield From(input.get())
        print('Got char: ', char)
        yield From(output.put(char.upper()))


@asyncio.coroutine
def glue(input, output):
    chunk = []
    while True:
        e = yield From(input.get())
        chunk.append(e)
        print('Current chunk: ', chunk)
        if len(chunk) == 3:
            yield From(output.put(chunk))
            chunk = []


@asyncio.coroutine
def tcp_echo_client(loop):
    reader, writer = yield From(asyncio.open_connection('127.0.0.1', 5555,
                                                        loop=loop))
    q1 = asyncio.Queue()
    q2 = asyncio.Queue()
    q3 = asyncio.Queue()

    @asyncio.coroutine
    def printer():
        while True:
            print('Pipeline ouput: ', (yield From(q3.get())))

    asyncio.async(upper(q1, q2))
    asyncio.async(glue(q2, q3))
    asyncio.async(printer())

    while True:
        data = yield From(reader.read(100))
        print('Data: ', data)

        for byte in data:
            yield From(q1.put(byte))

    print('Close the socket')
    writer.close()


@asyncio.coroutine
def background_stuff():
    while True:
        yield From(asyncio.sleep(3))
        print('Other background stuff...')


loop = asyncio.get_event_loop()
asyncio.async(background_stuff())
loop.run_until_complete(tcp_echo_client(loop))
loop.close()

Advantage over "David Beazley's coroutines" is that you can use all asyncio stuff inside such processing units with input and output queues.
Disadvantage here - a lot of queues instances needed for connecting pipeline units. It can be fixed with use of data sructure more advanced than asyncio.Queue.
Another disadvantage is that such kind of processing units does not propagate their exceptions to a parent stack frame, because they are "background tasks", whereas "David Beazley's coroutines" does propagate.

UPDATE #2

That's with what I came up:
https://gist.github.com/AndrewPashkin/04c287def6d165fc2832

like image 350
Gill Bates Avatar asked Nov 30 '14 08:11

Gill Bates


People also ask

Are coroutines Generators?

In Python, coroutines are similar to generators but with few extra methods and slight changes in how we use yield statements. Generators produce data for iteration while coroutines can also consume data.

What is the difference between generator and Coroutine?

Generators let you create functions that look like iterators to the consumer. Coroutines are an extension of the concept of traditional functions. A function will hand control back to its caller once through the return statement. A coroutine will hand control back any number of times by calling yield .

Which function is used in Coroutine Generators to raise an exception at the suspension point?

In summary, throw() behaves like next() or send() , except it raises an exception at the suspension point. If the generator is already in the closed state, throw() just raises the exception it was passed without executing any of the generator's code.

Are Python Generators asynchronous?

Simply put these are generators written in asynchronous functions (instead of def function(..) they use async def function(..)


1 Answers

I think the answer here is "you don't". I'm guessing that you're getting this idea from David Beazley's famous coroutine/generator tutorial. In his tutorials, he's using coroutines as basically a reversed generator pipeline. Instead of pulling the data through the pipeline by iterating over generators, you push data through the pipeline using gen_object.send(). Your first example would look something like this using this notion of coroutines:

from string import letters

def coroutine(func):
    def start(*args,**kwargs):
        cr = func(*args,**kwargs)
        cr.next()
        return cr
    return start

@coroutine
def filter_upper(target):
    while True:
        letter = yield
        if letter.isupper():
            target.send(letter)

@coroutine
def filter_selected(selected):
    selected = set(map(str.lower, selected))
    out = []
    try:
        while True:
            letter = yield
            if letter.lower() in selected:
                out.append(letter)
    except GeneratorExit:
        print out

def main():
    filt = filter_upper(filter_selected(['a', 'b', 'c']))
    for letter in letters:
        filt.send(letter)
    filt.close()

if __name__ == "__main__":
    main()

Now, the coroutines in asyncio are similar in that they're suspendable generator objects which can have data sent into them, but they really aren't meant for the data pipelining use-case at all. They're meant to be used to enable concurrency when you're executing blocking I/O operations. The yield from suspension points allow control to return to the event loop while the I/O happens, and the event loop will restart the coroutine when it completes, sending the data returned by the I/O call into the coroutine. There's really no practical reason to try to use them for this kind of use case, since there's no blocking I/O happening at all.

Also, the problem with your attempt at using asyncio is that a = yield from coro() is assigning a to the return value of coro. But you're not actually returning anything from coro. You're caught somewhere between treating coro as an actual coroutine and a generator. It looks like you're expecting yield from future to send the contents of future from coro to coro2, but that's not how coroutines work. yield from is used to pull data from a coroutine/Future/Task, and return is used to actually send an object back to the caller. So, for coro to actually return something to coro2, you need to do this:

@asyncio.coroutine
def coro():
    for e in ['a', 'b', 'c']:
        future = asyncio.Future()
        future.set_result(e)
        return future

But that's just going to end with 'a' being returned to coro2. I think to get the output you expect you'd need to do this:

@asyncio.coroutine
def coro():
    future = asyncio.Future()
    future.set_result(['a', 'b', 'c'])
    return future

Which maybe demonstrates why asyncio coroutines are not what you want here.

Edit:

Ok, given a case where you want to use pipelining in addition to actually making use of async I/O, I think the approach you used in your update is good. As you suggested, it can be made simpler by creating a data structure to help automate the queue management:

class Pipeline(object):
    def __init__(self, *nodes):
        if len(nodes) < 2:
            raise Exception("Need at least two nodes in the pipeline")
        self.start = asyncio.Queue()
        in_ = self.start
        for node in nodes:
            out = asyncio.Queue()
            asyncio.async(node(in_, out))
            in_ = out

    @asyncio.coroutine
    def put(self, val):
        yield from self.start.put(val)

# ... (most code is unchanged)

@asyncio.coroutine
def printer(input_, output): 
    # For simplicity, I have the sink taking an output queue. Its not being used,
    # but you could make the final output queue accessible from the Pipeline object
    # and then add a get() method to the `Pipeline` itself.
    while True:
        print('Pipeline ouput: ', (yield from input_.get()))


@asyncio.coroutine
def tcp_echo_client(loop):
    reader, writer = yield from asyncio.open_connection('127.0.0.1', 5555,
                                                        loop=loop)
    pipe = Pipeline(upper, glue, printer)

    while True:
        data = yield from reader.read(100)
        if not data:
            break
        print('Data: ', data)

        for byte in data.decode('utf-8'):
            yield from pipe.put(byte)  # Add to the pipe

    print('Close the socket')
    writer.close()

This simplifies Queue management, but doesn't address the exception handling issue. I'm not sure if much can be done about that...

like image 162
dano Avatar answered Oct 16 '22 18:10

dano