Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Please explain "Task was destroyed but it is pending!"

I am learning asyncio with Python 3.4.2 and I use it to continuously listen on an IPC bus, while gbulb listens on the DBus.

I created a function listen_to_ipc_channel_layer that continuously listens for incoming messages on the IPC channel and passes the message to message_handler.

I am also listening to SIGTERM and SIGINT. When I send a SIGTERM to the python process running the code you find at the bottom, the script should terminate gracefully.

The problem I am having is the following warning:

got signal 15: exit Task was destroyed but it is pending! task: <Task pending coro=<listen_to_ipc_channel_layer() running at /opt/mainloop-test.py:23> wait_for=<Future cancelled>>  Process finished with exit code 0 

…with the following code:

import asyncio import gbulb import signal import asgi_ipc as asgi  def main():     asyncio.async(listen_to_ipc_channel_layer())     loop = asyncio.get_event_loop()      for sig in (signal.SIGINT, signal.SIGTERM):         loop.add_signal_handler(sig, ask_exit)      # Start listening on the Linux IPC bus for incoming messages     loop.run_forever()     loop.close()  @asyncio.coroutine def listen_to_ipc_channel_layer():     """Listens to the Linux IPC bus for messages"""     while True:         message_handler(message=channel_layer.receive(["my_channel"]))         try:             yield from asyncio.sleep(0.1)         except asyncio.CancelledError:             break  def ask_exit():     loop = asyncio.get_event_loop()     for task in asyncio.Task.all_tasks():         task.cancel()     loop.stop()   if __name__ == "__main__":     gbulb.install()     # Connect to the IPC bus     channel_layer = asgi.IPCChannelLayer(prefix="my_channel")     main() 

I still only understand very little of asyncio, but I think I know what is going on. While waiting for yield from asyncio.sleep(0.1) the signal handler caught the SIGTERM and in that process it calls task.cancel().

Shouldn't this trigger the CancelledError within the while True: loop? (Because it is not, but that is how I understand "Calling cancel() will throw a CancelledError to the wrapped coroutine").

Eventually loop.stop() is called which stops the loop without waiting for either yield from asyncio.sleep(0.1) to return a result or even the whole coroutine listen_to_ipc_channel_layer.

Please correct me if I am wrong.

I think the only thing I need to do is to make my program wait for the yield from asyncio.sleep(0.1) to return a result and/or coroutine to break out the while loop and finish.

I believe I confuse a lot of things. Please help me get those things straight so that I can figure out how to gracefully close the event loop without warning.

like image 865
Daniel Avatar asked Nov 30 '16 20:11

Daniel


2 Answers

The problem comes from closing the loop immediately after cancelling the tasks. As the cancel() docs state

"This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop."

Take this snippet of code:

import asyncio import signal   async def pending_doom():     await asyncio.sleep(2)     print(">> Cancelling tasks now")     for task in asyncio.Task.all_tasks():         task.cancel()      print(">> Done cancelling tasks")     asyncio.get_event_loop().stop()   def ask_exit():     for task in asyncio.Task.all_tasks():         task.cancel()   async def looping_coro():     print("Executing coroutine")     while True:         try:             await asyncio.sleep(0.25)         except asyncio.CancelledError:             print("Got CancelledError")             break          print("Done waiting")      print("Done executing coroutine")     asyncio.get_event_loop().stop()   def main():     asyncio.async(pending_doom())     asyncio.async(looping_coro())      loop = asyncio.get_event_loop()     for sig in (signal.SIGINT, signal.SIGTERM):         loop.add_signal_handler(sig, ask_exit)      loop.run_forever()      # I had to manually remove the handlers to     # avoid an exception on BaseEventLoop.__del__     for sig in (signal.SIGINT, signal.SIGTERM):         loop.remove_signal_handler(sig)   if __name__ == '__main__':     main() 

Notice ask_exit cancels the tasks but does not stop the loop, on the next cycle looping_coro() stops it. The output if you cancel it is:

Executing coroutine Done waiting Done waiting Done waiting Done waiting ^CGot CancelledError Done executing coroutine 

Notice how pending_doom cancels and stops the loop immediately after. If you let it run until the pending_doom coroutines awakes from the sleep you can see the same warning you're getting:

Executing coroutine Done waiting Done waiting Done waiting Done waiting Done waiting Done waiting Done waiting >> Cancelling tasks now >> Done cancelling tasks Task was destroyed but it is pending! task: <Task pending coro=<looping_coro() running at canceling_coroutines.py:24> wait_for=<Future cancelled>> 
like image 192
Yeray Diaz Avatar answered Oct 06 '22 00:10

Yeray Diaz


The meaning of the issue is that a loop doesn't have time to finish all the tasks.

This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop.

There is no chance to do a "next cycle" of the loop in your approach. To make it properly you should move a stop operation to a separate non-cyclic coroutine to give your loop a chance to finish.

Second significant thing is CancelledError raising.

Unlike Future.cancel(), this does not guarantee that the task will be cancelled: the exception might be caught and acted upon, delaying cancellation of the task or preventing cancellation completely. The task may also return a value or raise a different exception.

Immediately after this method is called, cancelled() will not return True (unless the task was already cancelled). A task will be marked as cancelled when the wrapped coroutine terminates with a CancelledError exception (even if cancel() was not called).

So after cleanup your coroutine must raise CancelledError to be marked as cancelled.

Using an extra coroutine to stop the loop is not an issue because it is not cyclic and be done immediately after execution.

def main():                                                   loop = asyncio.get_event_loop()                           asyncio.ensure_future(listen_to_ipc_channel_layer())                                                            for sig in (signal.SIGINT, signal.SIGTERM):                   loop.add_signal_handler(sig, ask_exit)                loop.run_forever()                                        print("Close")                                            loop.close()                                                                                                                                                      @asyncio.coroutine                                        def listen_to_ipc_channel_layer():                            while True:                                                   try:                                                          print("Running")                                              yield from asyncio.sleep(0.1)                         except asyncio.CancelledError as e:                           print("Break it out")                                     raise e # Raise a proper error                                                                                                  # Stop the loop concurrently            @asyncio.coroutine                                        def exit():                                                   loop = asyncio.get_event_loop()                           print("Stop")                                             loop.stop()                                             def ask_exit():                               for task in asyncio.Task.all_tasks():         task.cancel()                         asyncio.ensure_future(exit())                                                                                     if __name__ == "__main__":                    main()                                
like image 25
I159 Avatar answered Oct 06 '22 01:10

I159