Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

When using asyncio, how do you allow all running tasks to finish before shutting down the event loop

I have the following code:

@asyncio.coroutine def do_something_periodically():     while True:         asyncio.async(my_expensive_operation())         yield from asyncio.sleep(my_interval)         if shutdown_flag_is_set:             print("Shutting down")             break 

I run this function until complete. The problem occurs when shutdown is set - the function completes and any pending tasks are never run.

This is the error:

task: <Task pending coro=<report() running at script.py:33> wait_for=<Future pending cb=[Task._wakeup()]>> 

How do I schedule a shutdown correctly?

To give some context, I'm writing a system monitor which reads from /proc/stat every 5 seconds, computes the cpu usage in that period, and then sends the result to a server. I want to keep scheduling these monitoring jobs until I receive sigterm, when I stop scheduling, wait for all current jobs to finish, and exit gracefully.

like image 409
derekdreery Avatar asked Jan 06 '15 10:01

derekdreery


People also ask

How do I close all Asyncio tasks?

SIGTERM signals. This function cancels all tasks and then closes the event loop. when I send signal. SIGTERM(kill 'pid').

How do I close Asyncio event loop?

stop() – the stop function stops a running loop. is_running() – this function checks if the event loop is currently running or not. is_closed() – this function checks if the event loop is closed or not. close() – the close function closes the event loop.

How do I close a running event loop?

Run the event loop until stop() is called. If stop() is called before run_forever() is called, the loop will poll the I/O selector once with a timeout of zero, run all callbacks scheduled in response to I/O events (and those that were already scheduled), and then exit.

What does await do in Asyncio?

The keyword await passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.) If Python encounters an await f() expression in the scope of g() , this is how await tells the event loop, “Suspend execution of g() until whatever I'm waiting on—the result of f() —is returned.


2 Answers

You can retrieve unfinished tasks and run the loop again until they finished, then close the loop or exit your program.

pending = asyncio.all_tasks() loop.run_until_complete(asyncio.gather(*pending)) 
  • pending is a list of pending tasks.
  • asyncio.gather() allows to wait on several tasks at once.

If you want to ensure all the tasks are completed inside a coroutine (maybe you have a "main" coroutine), you can do it this way, for instance:

async def do_something_periodically():     while True:         asyncio.create_task(my_expensive_operation())         await asyncio.sleep(my_interval)         if shutdown_flag_is_set:             print("Shutting down")             break      await asyncio.gather(*asyncio.all_tasks()) 

Also, in this case, since all the tasks are created in the same coroutine, you already have access to the tasks:

async def do_something_periodically():     tasks = []     while True:         tasks.append(asyncio.create_task(my_expensive_operation()))         await asyncio.sleep(my_interval)         if shutdown_flag_is_set:             print("Shutting down")             break      await asyncio.gather(*tasks) 
like image 55
Martin Richard Avatar answered Sep 19 '22 18:09

Martin Richard


As of Python 3.7 the above answer uses multiple deprecated APIs (asyncio.async and Task.all_tasks,@asyncio.coroutine, yield from, etc.) and you should rather use this:

import asyncio   async def my_expensive_operation(expense):     print(await asyncio.sleep(expense, result="Expensive operation finished."))   async def do_something_periodically(expense, interval):     while True:         asyncio.create_task(my_expensive_operation(expense))         await asyncio.sleep(interval)   loop = asyncio.get_event_loop() coro = do_something_periodically(1, 1)  try:     loop.run_until_complete(coro) except KeyboardInterrupt:     coro.close()     tasks = asyncio.all_tasks(loop)     expensive_tasks = {task for task in tasks if task._coro.__name__ != coro.__name__}     loop.run_until_complete(asyncio.gather(*expensive_tasks)) 
like image 26
throws_exceptions_at_you Avatar answered Sep 23 '22 18:09

throws_exceptions_at_you