Consider this code:
#!/usr/bin/env python
# coding=utf-8
from string import letters
def filter_upper(letters):
for letter in letters:
if letter.isupper():
yield letter
def filter_selected(letters, selected):
selected = set(map(str.lower, selected))
for letter in letters:
if letter.lower() in selected:
yield letter
def main():
stuff = filter_selected(filter_upper(letters), ['a', 'b', 'c'])
print(list(stuff))
main()
This is the illustration of a pipeline constructed from generators. I often use this pattern in practice to build data processing flow. It's like UNIX pipes.
What is the most elegant way to refactor the generators to coroutines that suspend execution every yield
?
My first try was like this:
#!/usr/bin/env python
# coding=utf-8
import asyncio
@asyncio.coroutine
def coro():
for e in ['a', 'b', 'c']:
future = asyncio.Future()
future.set_result(e)
yield from future
@asyncio.coroutine
def coro2():
a = yield from coro()
print(a)
loop = asyncio.get_event_loop()
loop.run_until_complete(coro2())
But for some reason it doesnt work - variable a
becomes None
.
What I came up with recently:
Server:
#!/usr/bin/env python
# coding=utf-8
"""Server that accepts a client and send it strings from user input."""
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = ''
port = 5555
s.bind((host, port))
s.listen(1)
print('Listening...')
conn, addr = s.accept()
print('Client ({}) connected.'.format(addr))
while True:
conn.send(raw_input('Enter data to send: '))
Client:
#!/usr/bin/env python
# coding=utf-8
"""Client that demonstrates processing pipeline."""
import trollius as asyncio
from trollius import From
@asyncio.coroutine
def upper(input, output):
while True:
char = yield From(input.get())
print('Got char: ', char)
yield From(output.put(char.upper()))
@asyncio.coroutine
def glue(input, output):
chunk = []
while True:
e = yield From(input.get())
chunk.append(e)
print('Current chunk: ', chunk)
if len(chunk) == 3:
yield From(output.put(chunk))
chunk = []
@asyncio.coroutine
def tcp_echo_client(loop):
reader, writer = yield From(asyncio.open_connection('127.0.0.1', 5555,
loop=loop))
q1 = asyncio.Queue()
q2 = asyncio.Queue()
q3 = asyncio.Queue()
@asyncio.coroutine
def printer():
while True:
print('Pipeline ouput: ', (yield From(q3.get())))
asyncio.async(upper(q1, q2))
asyncio.async(glue(q2, q3))
asyncio.async(printer())
while True:
data = yield From(reader.read(100))
print('Data: ', data)
for byte in data:
yield From(q1.put(byte))
print('Close the socket')
writer.close()
@asyncio.coroutine
def background_stuff():
while True:
yield From(asyncio.sleep(3))
print('Other background stuff...')
loop = asyncio.get_event_loop()
asyncio.async(background_stuff())
loop.run_until_complete(tcp_echo_client(loop))
loop.close()
Advantage over "David Beazley's coroutines" is that you can use all asyncio
stuff inside such processing units with input
and output
queues.
Disadvantage here - a lot of queues instances needed for connecting pipeline units. It can be fixed with use of data sructure more advanced than asyncio.Queue
.
Another disadvantage is that such kind of processing units does not propagate their exceptions to a parent stack frame, because they are "background tasks", whereas "David Beazley's coroutines" does propagate.
That's with what I came up:
https://gist.github.com/AndrewPashkin/04c287def6d165fc2832
In Python, coroutines are similar to generators but with few extra methods and slight changes in how we use yield statements. Generators produce data for iteration while coroutines can also consume data.
Generators let you create functions that look like iterators to the consumer. Coroutines are an extension of the concept of traditional functions. A function will hand control back to its caller once through the return statement. A coroutine will hand control back any number of times by calling yield .
In summary, throw() behaves like next() or send() , except it raises an exception at the suspension point. If the generator is already in the closed state, throw() just raises the exception it was passed without executing any of the generator's code.
Simply put these are generators written in asynchronous functions (instead of def function(..) they use async def function(..)
I think the answer here is "you don't". I'm guessing that you're getting this idea from David Beazley's famous coroutine/generator tutorial. In his tutorials, he's using coroutines as basically a reversed generator pipeline. Instead of pulling the data through the pipeline by iterating over generators, you push data through the pipeline using gen_object.send()
. Your first example would look something like this using this notion of coroutines:
from string import letters
def coroutine(func):
def start(*args,**kwargs):
cr = func(*args,**kwargs)
cr.next()
return cr
return start
@coroutine
def filter_upper(target):
while True:
letter = yield
if letter.isupper():
target.send(letter)
@coroutine
def filter_selected(selected):
selected = set(map(str.lower, selected))
out = []
try:
while True:
letter = yield
if letter.lower() in selected:
out.append(letter)
except GeneratorExit:
print out
def main():
filt = filter_upper(filter_selected(['a', 'b', 'c']))
for letter in letters:
filt.send(letter)
filt.close()
if __name__ == "__main__":
main()
Now, the coroutines in asyncio
are similar in that they're suspendable generator objects which can have data sent into them, but they really aren't meant for the data pipelining use-case at all. They're meant to be used to enable concurrency when you're executing blocking I/O operations. The yield from
suspension points allow control to return to the event loop while the I/O happens, and the event loop will restart the coroutine when it completes, sending the data returned by the I/O call into the coroutine. There's really no practical reason to try to use them for this kind of use case, since there's no blocking I/O happening at all.
Also, the problem with your attempt at using asyncio
is that a = yield from coro()
is assigning a to the return value of coro
. But you're not actually returning anything from coro
. You're caught somewhere between treating coro
as an actual coroutine and a generator. It looks like you're expecting yield from future
to send the contents of future
from coro
to coro2
, but that's not how coroutines work. yield from
is used to pull data from a coroutine/Future
/Task
, and return
is used to actually send an object back to the caller. So, for coro
to actually return something to coro2
, you need to do this:
@asyncio.coroutine
def coro():
for e in ['a', 'b', 'c']:
future = asyncio.Future()
future.set_result(e)
return future
But that's just going to end with 'a'
being returned to coro2
. I think to get the output you expect you'd need to do this:
@asyncio.coroutine
def coro():
future = asyncio.Future()
future.set_result(['a', 'b', 'c'])
return future
Which maybe demonstrates why asyncio
coroutines are not what you want here.
Edit:
Ok, given a case where you want to use pipelining in addition to actually making use of async I/O, I think the approach you used in your update is good. As you suggested, it can be made simpler by creating a data structure to help automate the queue management:
class Pipeline(object):
def __init__(self, *nodes):
if len(nodes) < 2:
raise Exception("Need at least two nodes in the pipeline")
self.start = asyncio.Queue()
in_ = self.start
for node in nodes:
out = asyncio.Queue()
asyncio.async(node(in_, out))
in_ = out
@asyncio.coroutine
def put(self, val):
yield from self.start.put(val)
# ... (most code is unchanged)
@asyncio.coroutine
def printer(input_, output):
# For simplicity, I have the sink taking an output queue. Its not being used,
# but you could make the final output queue accessible from the Pipeline object
# and then add a get() method to the `Pipeline` itself.
while True:
print('Pipeline ouput: ', (yield from input_.get()))
@asyncio.coroutine
def tcp_echo_client(loop):
reader, writer = yield from asyncio.open_connection('127.0.0.1', 5555,
loop=loop)
pipe = Pipeline(upper, glue, printer)
while True:
data = yield from reader.read(100)
if not data:
break
print('Data: ', data)
for byte in data.decode('utf-8'):
yield from pipe.put(byte) # Add to the pipe
print('Close the socket')
writer.close()
This simplifies Queue
management, but doesn't address the exception handling issue. I'm not sure if much can be done about that...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With