Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asyncio exception handler: not getting called until event loop thread stopped

I am setting an exception handler on my asyncio event loop. However, it doesn't seem to be called until the event loop thread is stopped. For example, consider this code:

def exception_handler(loop, context):
    print('Exception handler called')

loop = asyncio.get_event_loop()

loop.set_exception_handler(exception_handler)

thread = Thread(target=loop.run_forever)
thread.start()

async def run():
    raise RuntimeError()

asyncio.run_coroutine_threadsafe(run(), loop)

loop.call_soon_threadsafe(loop.stop, loop)

thread.join()

This code prints "Exception handler called", as we might expect. However, if I remove the line that shuts-down the event loop (loop.call_soon_threadsafe(loop.stop, loop)) it no longer prints anything.

I have a few questions about this:

  • Am I doing something wrong here?

  • Does anyone know if this is the intended behaviour of asyncio exception handlers? I can't find anything that documents this, and it seems a little strange to me.

I'd quite like to have a long-running event loop that logs errors happening in its coroutines, so the current behaviour seems problematic for me.

like image 990
Neil Avatar asked Nov 21 '17 12:11

Neil


People also ask

How does the Asyncio event loop work?

The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Application developers should typically use the high-level asyncio functions, such as asyncio.

How do I close Asyncio event loop?

stop() – the stop function stops a running loop. is_running() – this function checks if the event loop is currently running or not. is_closed() – this function checks if the event loop is closed or not. close() – the close function closes the event loop.

Does Run_until_complete block?

run_until_complete() blocks execution until all calls complete. The output of this is a list with the results from each call. Following the points discussed so far will produce code that runs synchronous blocks. But some of those blocks will execute several asynchronous functions together.

Does Asyncio use multiple threads?

These systems will keep track of our non-blocking sockets and notify us when they are ready for us to do something with them. This notification system is the basis of how asyncio is able to achieve concurrency. In asyncio's model of concurrency we have only one thread executing Python at any given time.


1 Answers

There are a few problems in the code above:

  • stop() does not need a parameter
  • The program ends before the coroutine is executed (stop() was called before it).

Here is the fixed code (without exceptions and the exception handler):

import asyncio
from threading import Thread


async def coro():
    print("in coro")
    return 42


loop = asyncio.get_event_loop()
thread = Thread(target=loop.run_forever)
thread.start()

fut = asyncio.run_coroutine_threadsafe(coro(), loop)

print(fut.result())

loop.call_soon_threadsafe(loop.stop)

thread.join()

call_soon_threadsafe() returns a future object which holds the exception (it does not get to the default exception handler):

import asyncio
from pprint import pprint
from threading import Thread


def exception_handler(loop, context):
    print('Exception handler called')
    pprint(context)


loop = asyncio.get_event_loop()

loop.set_exception_handler(exception_handler)

thread = Thread(target=loop.run_forever)
thread.start()


async def coro():
    print("coro")
    raise RuntimeError("BOOM!")


fut = asyncio.run_coroutine_threadsafe(coro(), loop)
try:
    print("success:", fut.result())
except:
    print("exception:", fut.exception())

loop.call_soon_threadsafe(loop.stop)

thread.join()

However, coroutines that are called using create_task() or ensure_future() will call the exception_handler:

async def coro2():
    print("coro2")
    raise RuntimeError("BOOM2!")


async def coro():
    loop.create_task(coro2())
    print("coro")
    raise RuntimeError("BOOM!")

You can use this to create a small wrapper:

async def boom(x):
    print("boom", x)
    raise RuntimeError("BOOM!")


async def call_later(coro, *args, **kwargs):
    loop.create_task(coro(*args, **kwargs))
    return "ok"


fut = asyncio.run_coroutine_threadsafe(call_later(boom, 7), loop)

However, you should probably consider using a Queue to communicate with your thread instead.

like image 52
Udi Avatar answered Oct 02 '22 16:10

Udi