I am setting an exception handler on my asyncio event loop. However, it doesn't seem to be called until the event loop thread is stopped. For example, consider this code:
def exception_handler(loop, context):
print('Exception handler called')
loop = asyncio.get_event_loop()
loop.set_exception_handler(exception_handler)
thread = Thread(target=loop.run_forever)
thread.start()
async def run():
raise RuntimeError()
asyncio.run_coroutine_threadsafe(run(), loop)
loop.call_soon_threadsafe(loop.stop, loop)
thread.join()
This code prints "Exception handler called", as we might expect. However, if I remove the line that shuts-down the event loop (loop.call_soon_threadsafe(loop.stop, loop)
) it no longer prints anything.
I have a few questions about this:
Am I doing something wrong here?
Does anyone know if this is the intended behaviour of asyncio exception handlers? I can't find anything that documents this, and it seems a little strange to me.
I'd quite like to have a long-running event loop that logs errors happening in its coroutines, so the current behaviour seems problematic for me.
The event loop is the core of every asyncio application. Event loops run asynchronous tasks and callbacks, perform network IO operations, and run subprocesses. Application developers should typically use the high-level asyncio functions, such as asyncio.
stop() – the stop function stops a running loop. is_running() – this function checks if the event loop is currently running or not. is_closed() – this function checks if the event loop is closed or not. close() – the close function closes the event loop.
run_until_complete() blocks execution until all calls complete. The output of this is a list with the results from each call. Following the points discussed so far will produce code that runs synchronous blocks. But some of those blocks will execute several asynchronous functions together.
These systems will keep track of our non-blocking sockets and notify us when they are ready for us to do something with them. This notification system is the basis of how asyncio is able to achieve concurrency. In asyncio's model of concurrency we have only one thread executing Python at any given time.
There are a few problems in the code above:
stop()
does not need a parameterstop()
was called before it).Here is the fixed code (without exceptions and the exception handler):
import asyncio
from threading import Thread
async def coro():
print("in coro")
return 42
loop = asyncio.get_event_loop()
thread = Thread(target=loop.run_forever)
thread.start()
fut = asyncio.run_coroutine_threadsafe(coro(), loop)
print(fut.result())
loop.call_soon_threadsafe(loop.stop)
thread.join()
call_soon_threadsafe()
returns a future object which holds the exception (it does not get to the default exception handler):
import asyncio
from pprint import pprint
from threading import Thread
def exception_handler(loop, context):
print('Exception handler called')
pprint(context)
loop = asyncio.get_event_loop()
loop.set_exception_handler(exception_handler)
thread = Thread(target=loop.run_forever)
thread.start()
async def coro():
print("coro")
raise RuntimeError("BOOM!")
fut = asyncio.run_coroutine_threadsafe(coro(), loop)
try:
print("success:", fut.result())
except:
print("exception:", fut.exception())
loop.call_soon_threadsafe(loop.stop)
thread.join()
However, coroutines that are called using create_task()
or ensure_future()
will call the exception_handler:
async def coro2():
print("coro2")
raise RuntimeError("BOOM2!")
async def coro():
loop.create_task(coro2())
print("coro")
raise RuntimeError("BOOM!")
You can use this to create a small wrapper:
async def boom(x):
print("boom", x)
raise RuntimeError("BOOM!")
async def call_later(coro, *args, **kwargs):
loop.create_task(coro(*args, **kwargs))
return "ok"
fut = asyncio.run_coroutine_threadsafe(call_later(boom, 7), loop)
However, you should probably consider using a Queue to communicate with your thread instead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With