Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

chaining coroutines in asyncio (and observer pattern)

I'm having trouble wrapping my head around how coroutines are chained together. In a slightly less trivial example than hello world or factorials, I'd like to have a loop which continually watches file modification times, and then prints out the time whenever the file is touched:

#!/usr/bin/env python3
import os
import asyncio

@asyncio.coroutine
def pathmonitor(path):
    modtime = os.path.getmtime(path)
    while True:
        new_time = os.path.getmtime(path)
        if new_time != modtime:
            modtime = new_time
            yield modtime
        yield from asyncio.sleep(1)

@asyncio.coroutine
def printer():
    while True:
        modtime = yield from pathmonitor('/home/users/gnr/tempfile')
        print(modtime)

loop = asyncio.get_event_loop()
loop.run_until_complete(printer())
loop.run_forever()

I would expect this to work - however, when I run it I get a:

RuntimeError: Task got bad yield: 1426449327.2590399

What am i doing wrong here?

UPDATE: see my answer below for an example of the observer pattern (i.e. efficiently allow multiple registrants to get updates when a file gets touched) without using callbacks (you have to use Tasks).

UPDATE2: there is a better fix for this: 3.5's async for (asynchronous iterators): https://www.python.org/dev/peps/pep-0492/

like image 865
gnr Avatar asked Mar 15 '15 20:03

gnr


People also ask

How does coroutines support cooperative multitasking in Python?

Coroutines work cooperatively multitask by suspending and resuming at set points by the programmer. In Python, coroutines are similar to generators but with few extra methods and slight changes in how we use yield statements. Generators produce data for iteration while coroutines can also consume data.

Which is used to write concurrently in Asyncio?

asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.

Which function is used to run Awaitables concurrently in Asyncio?

gather() method - It runs awaitable objects (objects which have await keyword) concurrently.

Does Asyncio use multiple threads?

These systems will keep track of our non-blocking sockets and notify us when they are ready for us to do something with them. This notification system is the basis of how asyncio is able to achieve concurrency. In asyncio's model of concurrency we have only one thread executing Python at any given time.


2 Answers

I got your code working by using return instead of yield in the chained coroutine, just like the chained coroutines example:

#!/usr/bin/env python3
import os
import asyncio2

@asyncio.coroutine
def pathmonitor(path):
    modtime = os.path.getmtime(path)
    while True:
        new_time = os.path.getmtime(path)
        if new_time != modtime:
            modtime = new_time
            return modtime
        yield from asyncio.sleep(1)


@asyncio.coroutine
def printer():
    while True:
        modtime = yield from pathmonitor('/tmp/foo.txt')
        print(modtime)


loop = asyncio.get_event_loop()
loop.run_until_complete(printer())
loop.run_forever()

Note that printer()'s loop will create a new pathmonitor generator for each iteration. Not sure if this is what you had in mind but it might be a start.

I find the coroutines API and syntax a bit confusing myself. Here's some reading that I have found helpful:

  • What’s New In Python 3.3: "PEP 380: Syntax for Delegating to a Subgenerator"
  • PEP380: "Formal semantics"
  • asyncio: "Example: Chain coroutines"
  • Greg Ewing's "Binary Tree" example
like image 111
André Laszlo Avatar answered Oct 25 '22 13:10

André Laszlo


As others pointed out, my mistake was that I was trying to use a coroutine like a generator. Instead of relying on a generator for iteration, I needed to create multiple coroutines. Also, I needed to use tasks to implement the observer pattern without callbacks since multiple registrants can yield from the same task. My pathmonitor looks something like this:

import os
import asyncio

class PathInfo:

    def __init__(self, path):
        self.path = path
        self.modtime = os.path.getmtime(path)
        self.startTask()

    def startTask(self):
        self.task = asyncio.async(self._checkIfTouched())

    def _checkIfTouched(self):
        while True:
            yield from asyncio.sleep(1)
            newtime = os.path.getmtime(self.path)
            if self.modtime != newtime:
                self.modtime = newtime
                return newtime

class PathMonitor:

    def __init__(self):
        self._info = {}

    @asyncio.coroutine
    def wasTouched(self, path):
        try:
            info = self._info[path]
        except KeyError:
            self._info[path] = info = PathInfo(path)
        if info.task.done():
            info.startTask()
        modtime = yield from info.task
        return modtime

def printer():
    while True:
        modtime = yield from mon.wasTouched('/tmp/myfile')
        print(modtime)

mon = PathMonitor()

loop = asyncio.get_event_loop()
asyncio.async(printer())
loop.run_forever()
like image 21
gnr Avatar answered Oct 25 '22 12:10

gnr