Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Detect an idle asyncio event loop

Is there some programming pattern that would enable me to detect when an asyncio event loop becomes idle in the following sense? Say that my execution path branches in some complex way, say, using asyncio.gather(), but I know that each branch eventually awaits some idle coroutine, such as a socket or a subprocess. Say I know these coroutines actually never yield, so the event loop will execute whatever python code it can but eventually will just wait for those idle coroutines. Is there a programmatic way to detect such a state and stop the loop?

like image 421
jhrmnn Avatar asked Nov 18 '17 17:11

jhrmnn


1 Answers

As pointed out by Philip Couling, the solution presented below doesn't work. StackOverflow doesn't allow deleting an accepted answer, so I am adding this disclaimer.


What you call "idle" might be more accurately described as "waiting for IO or a timeout". In correctly written asyncio code one shouldn't need to detect that the loop is in that state because it shouldn't matter - the loop is doing its job, and it is up to the tools like asyncio.gather, asyncio.wait, and loop.run_until_complete to ensure that it ends at the appropriate time. However, things are not always perfect, and if you really want to do that, it is certainly possible.

At each step of the event loop it checks for tasks that are ready to run. If there are any, their steps are invoked. Once no more tasks are ready, the event loop waits for an IO event or the soonest timeout, whichever happens first. The important thing to notice is that running tasks always have precedence over waiting for IO. Therefore, to detect the case when no tasks are ready, one can schedule a dummy IO event that is known to fire immediately.

The following coroutine sets up such an event and awaits it to trigger:

import socket, asyncio

async def detect_iowait():
    loop = asyncio.get_event_loop()
    rsock, wsock = socket.socketpair()
    wsock.close()
    await loop.sock_recv(rsock, 1)
    rsock.close()

It sets up a socket pair where reading from one socket returns the data written into the other socket. It immediately closes one of the sockets, so that reading from the other one immediately returns EOF, represented as an empty bytearray. Awaiting a read from that socket is fundamentally non-blocking - but asyncio doesn't know that, so it places the socket among on the IO wait list. As noted above, as soon as no runnable tasks are present, asyncio will wait for IO and detect_iowait will await the read on the socket and exit. Thus awaiting detect_iowait() itself detects IO wait.

Test code that uses detect_iowait() might look like this:

# stop loop.run_forever once iowait is detected
async def stop_on_iowait():
    await detect_iowait()
    print('iowait detected, stopping!')
    asyncio.get_event_loop().stop()

# a dummy calculation coroutine, emulating your execution path
async def calc(n):
    print('calc %d start' % n)
    async def noop():
        pass
    for i in range(n):
        await noop()
    print('calc %d end' % n)

# coroutine that waits on IO forever, also (ab)using a socket pair,
# this time creating a socket whose recv will never complete
async def io_forever():
    loop = asyncio.get_event_loop()
    sock, _ = socket.socketpair()
    sock.setblocking(False)
    await loop.sock_recv(sock, 1)

loop = asyncio.get_event_loop()
for t in calc(1000), calc(10000), calc(100000), io_forever():
    loop.create_task(t)
loop.create_task(stop_on_iowait())
loop.run_forever()
like image 103
user4815162342 Avatar answered Oct 14 '22 23:10

user4815162342