In a python aysncio application comprising multiple tasks, what is the correct way to report an unhandled exception in one of the tasks then exit the application? I have looked at several related questions, this one in particular:
How to shutdown the loop and print error if coroutine raised an exception with asyncio?
... but the best that gets me is shutting the application down with runtime errors.
For example, the following code will create 4 tasks then run a main loop. The tasks will throw an exception (AssertionError) after a period of time, to simulate a "this should never happen" event which I need to trigger an orderly shutdown of the application.
At present, this code will trigger the exception within its task, and that task will abort. Without the custom exception handler the other tasks will continue, but with it at least the application will abort, but it will do so with "RuntimeError: Event loop stopped before Future completed."
I'm testing in Python 3.7
#!/usr/bin/python3
import asyncio
class Test:
def __init__(self, loop):
loop.create_task( self.test(3))
loop.create_task( self.test(4))
loop.create_task( self.test(5))
loop.create_task( self.test(7))
def __enter__(self):
return self
def __exit__(self, type, value, tb):
pass
async def test(self, max):
i = 0
while True:
i = i + 1
print("Loop %d of %d" %(i,max))
assert i < max
await asyncio.sleep(1)
async def main(self):
while True:
print("Main loop doing stuff")
await asyncio.sleep(0.5)
def custom_exception_handler(loop, context):
# first, handle with default handler
loop.default_exception_handler(context)
loop.stop()
loop = asyncio.get_event_loop()
loop.set_exception_handler(custom_exception_handler)
with Test(loop) as t:
loop.run_until_complete( t.main() )
Instead of stopping the loop in your event handler, just catch the exception in test() and terminate main() gracefully. You see a traceback for the assertion error because the default exception handler does that. Everything else will happen automatically thanks to the run_until_complete function, and you get no RuntimeExceptions or warnings.
In general you don't want an exception in Task A to shutdown other tasks, but your application logic may require that sort of thing. In that case you need to explicitly code a mechanism to do that. Usually there is a better way to terminate the loop than loop.stop(), unless you started the loop with run_forever().
You might also look into Task.cancel(), which could be useful in more complex cases. If you decide to use that function, the task being cancelled should handle asyncio.CancelledError for a graceful termination. Note that is NOT a subclass of Exception but of RuntimeException, a little detail that tripped me up the first time.
I used Python3.8 on Win10, as my code indicates, but I don't think that will matter.
#! python3.8
import asyncio
class Test:
def __init__(self, loop):
self.running = True
loop.create_task(self.test(3))
loop.create_task(self.test(4))
loop.create_task(self.test(5))
loop.create_task(self.test(7))
def __enter__(self):
return self
def __exit__(self, *_x):
pass
async def test(self, mx):
try:
i = 0
while True:
i = i + 1
print("Loop %d of %d" %(i, mx))
assert i < mx
await asyncio.sleep(1)
except Exception:
self.running = False
raise
async def main(self):
while self.running:
print("Main loop doing stuff")
await asyncio.sleep(0.5)
loop = asyncio.get_event_loop()
with Test(loop) as t:
loop.run_until_complete(t.main())
Here is a second solution using the Task.cancel() function:
#! python3.8
import asyncio
class Test:
def __init__(self, loop):
self.main_task = loop.create_task(self.main())
loop.set_exception_handler(self.custom_exception_handler)
loop.create_task(self.test(3))
loop.create_task(self.test(4))
loop.create_task(self.test(5))
loop.create_task(self.test(7))
def __enter__(self):
return self
def __exit__(self, *_x):
pass
async def test(self, mx):
i = 0
while True:
i = i + 1
print("Loop %d of %d" %(i, mx))
assert i < mx
await asyncio.sleep(1)
async def main(self):
try:
while True:
print("Main loop doing stuff")
await asyncio.sleep(0.5)
except asyncio.CancelledError:
pass
def custom_exception_handler(self, loop, context):
# first, handle with default handler
loop.default_exception_handler(context)
self.main_task.cancel()
loop = asyncio.get_event_loop()
with Test(loop) as t:
loop.run_until_complete(t.main_task)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With