Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python 3.4 asyncio task doesn't get fully executed

I'm experimenting with Python 3.4's asyncio module. Since there's no production ready package for MongoDB using asyncio, I have written a small wrapper class that execute all mongo queries in an executor. This is the wrapper:

import asyncio
from functools import wraps
from pymongo import MongoClient


class AsyncCollection(object):
    def __init__(self, client):
        self._client = client
        self._loop = asyncio.get_event_loop()

    def _async_deco(self, name):
        method = getattr(self._client, name)

        @wraps(method)
        @asyncio.coroutine
        def wrapper(*args, **kwargs):
            print('starting', name, self._client)
            r = yield from self._loop.run_in_executor(None, method, *args, **kwargs)
            print('done', name, self._client, r)
            return r

        return wrapper

    def __getattr__(self, name):
        return self._async_deco(name)


class AsyncDatabase(object):
    def __init__(self, client):
        self._client = client
        self._collections = {}


    def __getitem__(self, col):
        return self._collections.setdefault(col, AsyncCollection(self._client[col]))


class AsyncMongoClient(object):
    def __init__(self, host, port):
        self._client = MongoClient(host, port)
        self._loop = asyncio.get_event_loop()
        self._databases = {}

    def __getitem__(self, db):
        return self._databases.setdefault(db, AsyncDatabase(self._client[db]))

I want to execute inserts asynchronously, meaning that the coroutine that executes them doesn't want to wait for the execution to complete. asyncio manual states that A task is automatically scheduled for execution when it is created. The event loop stops when all tasks are done., So I constructed this test script:

from asyncdb import AsyncMongoClient
import asyncio

@asyncio.coroutine
def main():
    print("Started")
    mongo = AsyncMongoClient("host", 27017)
    asyncio.async(mongo['test']['test'].insert({'_id' : 'test'}))
    print("Done")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

When I run the script I get the following result:

Started
Done
starting insert Collection(Database(MongoClient('host', 27017), 'test'), 'test')

There should be a line indicating that the mongo query is done. I can see that line when I yield from this coroutine instead of running it using asyncio.async. However, what's really odd is that the test entry actually exists in MongoDB when I run this corouting using asyncio.async, so despite the fact that it seems to work, I don't understand why can't I see the print statement indicating that the query has been preformed. Despite the fact that I run the event loop using run_until_completed, it should wait for the insert task to complete, even if the main coroutine finished before.

like image 694
reish Avatar asked Apr 20 '14 10:04

reish


1 Answers

asyncio.async(mongo...)) just schedules the mongo query. And run_until_complete() doesn't wait for it. Here's code example that shows it using asyncio.sleep() coroutine:

#!/usr/bin/env python3
import asyncio
from contextlib import closing
from timeit import default_timer as timer

@asyncio.coroutine
def sleep_BROKEN(n):
    # schedule coroutine; it runs on the next yield
    asyncio.async(asyncio.sleep(n))

@asyncio.coroutine
def sleep(n):
    yield from asyncio.sleep(n)

@asyncio.coroutine
def double_sleep(n):
    f = asyncio.async(asyncio.sleep(n))
    yield from asyncio.sleep(n) # the first sleep is also started
    yield from f

n = 2
with closing(asyncio.get_event_loop()) as loop:
    start = timer()
    loop.run_until_complete(sleep_BROKEN(n))
    print(timer() - start)
    loop.run_until_complete(sleep(n))
    print(timer() - start)
    loop.run_until_complete(double_sleep(n))
    print(timer() - start)

Output

0.0001221800921484828
2.002586881048046
4.005100341048092

Output shows that run_until_complete(sleep_BROKEN(n)) returns in less than 2 milliseconds instead of 2 seconds. And run_until_complete(sleep(n)) works as it should: it returns in 2 seconds. double_sleep() shows that coroutines scheduled by async.async() are run on yield from (two concurrent sleeps are in parallel) i.e., it sleep 2 seconds, not 4. If you add a delay (without allowing the event loop to run) before the first yield from then you see that yield from f doesn't return sooner i.e., asyncio.async doesn't run the coroutines; it only schedules them to run.

like image 145
jfs Avatar answered Oct 04 '22 22:10

jfs