Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python 3.5 async for blocks the ioloop

I have a simple aiohttp-server with two handlers. First one does some computations in the async for loop. Second one just returns text response. not_so_long_operation returns 30-th fibonacci number with the slowest recursive implementation, which takes something about one second.

def not_so_long_operation():
    return fib(30)

class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            return i
        else:
            raise StopAsyncIteration

# GET /
async def index(request):
    print('request!')
    l = []
    async for i in arange(20):
        print(i)
        l.append(not_so_long_operation())

    return aiohttp.web.Response(text='%d\n' % l[0])

# GET /lol/
async def lol(request):
    print('request!')
    return aiohttp.web.Response(text='just respond\n')

When I'm trying to fetch / and then /lol/, it gives me response for the second one only when the first one gets finished.
What am I doing wrong and how to make index handler release the ioloop on each iteration?

like image 524
Michael Ihnatenko Avatar asked Jan 08 '23 06:01

Michael Ihnatenko


2 Answers

Your example has no yield points (await statements) for switching between tasks. Asynchronous iterator allows to use await inside __aiter__/__anext__ but don't insert it automatically into your code.

Say,

class arange:
    def __init__(self, n):
        self.n = n
        self.i = 0

    async def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        self.i += 1
        if self.i <= self.n:
            await asyncio.sleep(0)  # insert yield point
            return i
        else:
            raise StopAsyncIteration

should work as you expected.

In real application most likely you don't need await asyncio.sleep(0) calls because you will wait on database access and similar activities.

like image 158
Andrew Svetlov Avatar answered Jan 09 '23 21:01

Andrew Svetlov


Since, fib(30) is CPU bound and sharing little data, you should probably use a ProcessPoolExecutor (as opposed to a ThreadPoolExecutor):

async def index(request):
    loop = request.app.loop
    executor = request.app["executor"]
    result = await loop.run_in_executor(executor, fib, 30)
    return web.Response(text="%d" % result)

Setup executor when you create the app:

app = Application(...)
app["exector"] = ProcessPoolExector()
like image 31
Jashandeep Sohi Avatar answered Jan 09 '23 19:01

Jashandeep Sohi