Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to shutdown the loop and print error if coroutine raised an exception with asyncio?

Tags:

Suppose I have a few coroutines running in a loop. How to make so that if some of them failed with exception the whole program would fail with this exception? Because right now asyncio doesn't even prints the error messages from coroutines unless I use logging level "DEBUG".

from asyncio import get_event_loop, sleep   async def c(sleep_time=2, fail=False):     print('c', sleep_time, fail)     if fail:         raise Exception('fail')     while True:         print('doing stuff')         await sleep(sleep_time)    loop = get_event_loop() loop.create_task(c(sleep_time=10, fail=False)) loop.create_task(c(fail=True)) loop.run_forever() 
like image 320
user1685095 Avatar asked Apr 04 '17 12:04

user1685095


People also ask

How do I close Asyncio event loop?

stop() – the stop function stops a running loop. is_running() – this function checks if the event loop is currently running or not. is_closed() – this function checks if the event loop is closed or not. close() – the close function closes the event loop.

How do you handle exceptions in coroutines?

To ignore any exceptions, launch the parent coroutine with the async function; however, if required to handle, the exception uses a try-catch block on the await() call on the Deferred object returned from async coroutine builder. When using launch builder the exception will be stored in a Job object.

How do you stop a running event loop?

Run the event loop until stop() is called. If stop() is called before run_forever() is called, the loop will poll the I/O selector once with a timeout of zero, run all callbacks scheduled in response to I/O events (and those that were already scheduled), and then exit.

How do I stop Asyncio tasks?

A Task is created and scheduled for its execution through the asyncio. create_task() function. Once scheduled, a Task can be requested for cancellation through task. cancel() method.


2 Answers

A graceful way is using error handling api.

https://docs.python.org/3/library/asyncio-eventloop.html#error-handling-api

Example:

import asyncio   async def run_division(a, b):     await asyncio.sleep(2)     return a / b   def custom_exception_handler(loop, context):     # first, handle with default handler     loop.default_exception_handler(context)      exception = context.get('exception')     if isinstance(exception, ZeroDivisionError):         print(context)         loop.stop()  loop = asyncio.get_event_loop()  # Set custom handler loop.set_exception_handler(custom_exception_handler) loop.create_task(run_division(1, 0)) loop.run_forever() 
like image 189
Char Avatar answered Sep 17 '22 04:09

Char


Here are some notes that you might want use to craft your solution:

The easiest way to retrieve a couroutine's exception (or result!) is to await for it. asyncio.gather() will create tasks from coroutines and wrap all of them in one encompassing task that will fail if one of the subtasks fails:

import asyncio  import random   async def coro(n):     print("Start", n)     await asyncio.sleep(random.uniform(0.2, 0.5))     if n % 4 == 0:         raise Exception('fail ({})'.format(n))     return "OK: {}".format(n)   async def main():     tasks = [coro(i) for i in range(10)]     await asyncio.gather(*tasks)     print("done")  loop = asyncio.get_event_loop() try:     asyncio.ensure_future(main())     loop.run_forever() finally:     loop.close() 

This however does not shutdown the loop. To stop a running loop, use loop.stop(). Use this instead:

async def main():     tasks = [coro(i) for i in range(10)]     try:         await asyncio.gather(*tasks)     except Exception as e:         loop.stop()         raise     print("done") 

Stopping the loop while some long-running coroutines are running is probably not what you want. You might want to first signal some your coroutines to shut down using an event:

import asyncio  import random   async def repeat(n):     print("start", n)     while not shutting_down.is_set():         print("repeat", n)         await asyncio.sleep(random.uniform(1, 3))     print("done", n)   async def main():     print("waiting 6 seconds..")     await asyncio.sleep(6)     print("shutting down")     shutting_down.set()  # not a coroutine!     print("waiting")     await asyncio.wait(long_running)     print("done")     loop.stop()  loop = asyncio.get_event_loop() shutting_down = asyncio.Event(loop=loop) long_running = [loop.create_task(repeat(i + 1))  for i in range(5)] try:     asyncio.ensure_future(main())     loop.run_forever() finally:     loop.close() 

If you don't want to await for your tasks, you might want to use an asyncio.Event (or asyncio.Queue) to signal a global error handler to stop the loop:

import asyncio   async def fail():     try:         print("doing stuff...")         await asyncio.sleep(0.2)         print("doing stuff...")         await asyncio.sleep(0.2)         print("doing stuff...")         raise Exception('fail')     except Exception as e:         error_event.payload = e         error_event.set()         raise  # optional   async def error_handler():     await error_event.wait()     e = error_event.payload     print("Got:", e)     raise e   loop = asyncio.get_event_loop() error_event = asyncio.Event() try:     loop.create_task(fail())     loop.run_until_complete(error_handler()) finally:     loop.close() 

(Used here with run_until_complete() for simplicity, but can be used with loop.stop() as well)

like image 41
Udi Avatar answered Sep 21 '22 04:09

Udi