I am trying to learn how to (idiomatically) use Python 3.4's asyncio
. My biggest stumbling block is how to "chain" coroutines that continually consume data, update state with it, and allow that state to be used by another coroutine.
The observable behaviour I expect from this example program is simply to periodically report on the sum of numbers received from a subprocess. The reporting should happen at roughly the same rate the Source
object recieves numbers from the subprocess. IO blocking in the reporting function should not block reading from the subprocess. If the reporting function blocks for longer than an iteration of reading from the subprocess, I don't care if it skips ahead or reports a bunch at once; but there should be about as many iterations of reporter()
as there are of expect_exact()
over a long enough timeframe.
#!/usr/bin/python3
import asyncio
import pexpect
class Source:
def __init__(self):
self.flag = asyncio.Event()
self.sum = 0
def start(self):
self.flag.set()
def stop(self):
self.flag.clear()
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
p = pexpect.spawn(
"python -c "
"'import random, time\n"
"while True: print(random.choice((-1, 1))); time.sleep(0.5)'")
while self.flag.is_set():
yield from p.expect_exact('\n', async=True)
self.sum += int(p.before)
p.terminate()
@asyncio.coroutine
def reporter(source):
while True:
# Something like:
new_sum = yield from source # ???
print("New sum is: {:d}".format(new_sum))
# Potentially some other blocking operation
yield from limited_throughput.write(new_sum)
def main():
loop = asyncio.get_event_loop()
source = Source()
loop.call_later(1, source.start)
loop.call_later(11, source.stop)
# Again, not sure what goes here...
asyncio.async(reporter(source))
loop.run_until_complete(source.run())
loop.close()
if __name__ == '__main__':
main()
This example requires pexpect
to be installed from git; you could just as easily replace run()
with:
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
while self.flag.is_set():
value = yield from asyncio.sleep(0.5, random.choice((-1, 1)))
self.sum += value
But the real subprocess I'm interested in needs to be run in a pty
, which I think means the supplied subprocess transport/protocol framework in asyncio
won't be sufficient for this. The point is that the source of the asynchronous activity is a coroutine that can be used with yield from
.
Note that the reporter()
function in this example is not valid code; my problem is that I don't know what should go in there. Ideally I'd like to keep the reporter()
code separate from run()
; the point of this exersise is to see how to factor out more complex programs into smaller units of code using the components in asyncio
.
Is there a way to structure this kind of behaviour with the asyncio
module?
To run a coroutine, there are three main mechanisms. 1. Use asyncio.run () This executes the passed coroutine, and once finished, closes the threadpool.
When you try to receive or send data over a socket through asyncio, what actually happens below is that the socket is first checked if it has any data that can be immediately read or sent.
Application developers should typically use the high-level asyncio functions, such as asyncio.run (), and should rarely need to reference the loop object or call its methods. In python 3.4, generator-based coroutines is created with @asyncio.coroutine decorator using new asyncio module library.
Concurrent tasks are scheduled in an Event Loop managed by a Thread. @asyncio.coroutine creates asyncio generator coroutine objects and uses ‘yield from’ to work on native/generator coroutines. yield from statement gives up control back to the event loop to let other coroutine execute.
The locking primitives and queues in asyncio
itself provide some mechanisms for doing this.
The asyncio.Condition()
provides a way to be notified of a condition. Use this when it doesn't matter if you drop some events.
class Source:
def __init__(self):
self.flag = asyncio.Event()
self.sum = 0
# For consumers
self.ready = asyncio.Condition()
def start(self):
self.flag.set()
def stop(self):
self.flag.clear()
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
p = pexpect.spawn(
"python -c "
"'import random, time\n"
"while True: print(random.choice((-1, 1))); time.sleep(0.5)'")
while self.flag.is_set():
yield from p.expect_exact('\n', async=True)
self.sum += int(p.before)
with (yield from self.ready):
self.ready.notify_all() # Or just notify() depending on situation
p.terminate()
@asyncio.coroutine
def read(self):
with (yield from self.ready):
yield from self.ready.wait()
return self.sum
@asyncio.coroutine
def reporter(source):
while True:
# Something like:
new_sum = yield from source.read()
print("New sum is: {:d}".format(new_sum))
# Other potentially blocking stuff in here
The asyncio.Queue()
lets you put your data in a queue (either LIFO or FIFO) and have something else read from it. Use this if you absolutely want to respond to every event, even if your consumer gets behind (in time). Note that if you limit the size of the queue, your producer will eventually block if your consumer is slow enough.
Note that this allows us to convert sum
to a local variable too.
#!/usr/bin/python3
import asyncio
import pexpect
class Source:
def __init__(self):
self.flag = asyncio.Event()
# NOTE: self.sum removed!
# For consumers
self.output = asyncio.Queue()
def start(self):
self.flag.set()
def stop(self):
self.flag.clear()
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
sum = 0
p = pexpect.spawn(
"python -c "
"'import random, time\n"
"while True: print(random.choice((-1, 1))); time.sleep(0.5)'")
while self.flag.is_set():
yield from p.expect_exact('\n', async=True)
sum += int(p.before)
yield from self.output.put(sum)
p.terminate()
@asyncio.coroutine
def read(self):
return (yield from self.output.get())
@asyncio.coroutine
def reporter(source):
while True:
# Something like:
new_sum = yield from source.read()
print("New sum is: {:d}".format(new_sum))
# Other potentially blocking stuff here
Note that Python 3.4.4 add task_done()
and join()
methods to the Queue
, to allow you to gracefully finish processing everything when you know the consumer is finished (where applicable).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With