Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I connect asyncio.coroutines that continually produce and consume data?

I am trying to learn how to (idiomatically) use Python 3.4's asyncio. My biggest stumbling block is how to "chain" coroutines that continually consume data, update state with it, and allow that state to be used by another coroutine.

The observable behaviour I expect from this example program is simply to periodically report on the sum of numbers received from a subprocess. The reporting should happen at roughly the same rate the Source object recieves numbers from the subprocess. IO blocking in the reporting function should not block reading from the subprocess. If the reporting function blocks for longer than an iteration of reading from the subprocess, I don't care if it skips ahead or reports a bunch at once; but there should be about as many iterations of reporter() as there are of expect_exact() over a long enough timeframe.

#!/usr/bin/python3
import asyncio
import pexpect

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        self.sum = 0

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            self.sum += int(p.before)

        p.terminate()

@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source # ???
        print("New sum is: {:d}".format(new_sum))
        # Potentially some other blocking operation
        yield from limited_throughput.write(new_sum)

def main():
    loop = asyncio.get_event_loop()

    source = Source()
    loop.call_later(1, source.start)
    loop.call_later(11, source.stop)

    # Again, not sure what goes here...
    asyncio.async(reporter(source))

    loop.run_until_complete(source.run())
    loop.close()

if __name__ == '__main__':
    main()

This example requires pexpect to be installed from git; you could just as easily replace run() with:

@asyncio.coroutine
def run(self):
    yield from self.flag.wait()

    while self.flag.is_set():
        value = yield from asyncio.sleep(0.5, random.choice((-1, 1)))
        self.sum += value

But the real subprocess I'm interested in needs to be run in a pty, which I think means the supplied subprocess transport/protocol framework in asyncio won't be sufficient for this. The point is that the source of the asynchronous activity is a coroutine that can be used with yield from.

Note that the reporter() function in this example is not valid code; my problem is that I don't know what should go in there. Ideally I'd like to keep the reporter() code separate from run(); the point of this exersise is to see how to factor out more complex programs into smaller units of code using the components in asyncio.

Is there a way to structure this kind of behaviour with the asyncio module?

like image 961
detly Avatar asked Mar 29 '15 00:03

detly


People also ask

How to run a coroutine in asyncio?

To run a coroutine, there are three main mechanisms. 1. Use asyncio.run () This executes the passed coroutine, and once finished, closes the threadpool.

What actually happens when you send and receive data through asyncio?

When you try to receive or send data over a socket through asyncio, what actually happens below is that the socket is first checked if it has any data that can be immediately read or sent.

How do I use asyncio in application development?

Application developers should typically use the high-level asyncio functions, such as asyncio.run (), and should rarely need to reference the loop object or call its methods. In python 3.4, generator-based coroutines is created with @asyncio.coroutine decorator using new asyncio module library.

How do I schedule a concurrent task in asyncio?

Concurrent tasks are scheduled in an Event Loop managed by a Thread. @asyncio.coroutine creates asyncio generator coroutine objects and uses ‘yield from’ to work on native/generator coroutines. yield from statement gives up control back to the event loop to let other coroutine execute.


Video Answer


1 Answers

The locking primitives and queues in asyncio itself provide some mechanisms for doing this.

Conditions

The asyncio.Condition() provides a way to be notified of a condition. Use this when it doesn't matter if you drop some events.

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        self.sum = 0

        # For consumers
        self.ready = asyncio.Condition()

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            self.sum += int(p.before)
            with (yield from self.ready):
                self.ready.notify_all() # Or just notify() depending on situation

        p.terminate()

    @asyncio.coroutine
    def read(self):
        with (yield from self.ready):
            yield from self.ready.wait()
            return self.sum


@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source.read()
        print("New sum is: {:d}".format(new_sum))
        # Other potentially blocking stuff in here

Queues

The asyncio.Queue() lets you put your data in a queue (either LIFO or FIFO) and have something else read from it. Use this if you absolutely want to respond to every event, even if your consumer gets behind (in time). Note that if you limit the size of the queue, your producer will eventually block if your consumer is slow enough.

Note that this allows us to convert sum to a local variable too.

#!/usr/bin/python3
import asyncio
import pexpect

class Source:

    def __init__(self):
        self.flag = asyncio.Event()
        # NOTE: self.sum removed!

        # For consumers
        self.output = asyncio.Queue()

    def start(self):
        self.flag.set()

    def stop(self):
        self.flag.clear()

    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()

        sum = 0

        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")

        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            sum += int(p.before)
            yield from self.output.put(sum)

        p.terminate()

    @asyncio.coroutine
    def read(self):
        return (yield from self.output.get())

@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source.read()
        print("New sum is: {:d}".format(new_sum))
        # Other potentially blocking stuff here

Note that Python 3.4.4 add task_done() and join() methods to the Queue, to allow you to gracefully finish processing everything when you know the consumer is finished (where applicable).

like image 142
detly Avatar answered Oct 19 '22 18:10

detly