Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Graceful shutdown of asyncio coroutines

I'm currently having problems closing asyncio coroutines during the shutdown CTRL-C of an application. The following code is a stripped down version of what I have right now:

#!/usr/bin/env python # -*- coding: UTF-8 -*- import asyncio import time import functools import signal   class DummyProtocol(asyncio.Protocol):      def __init__(self, *args, **kwargs):         self._shutdown = asyncio.Event()         self._response = asyncio.Queue(maxsize=1)         super().__init__(*args, **kwargs)      def connection_made(self, transport):         self.transport = transport      def close(self):         print("Closing protocol")         self._shutdown.set()      def data_received(self, data):          #data = b'OK MPD '          # Start listening for commands after a successful handshake         if data.startswith(b'OK MPD '):             print("Ready for sending commands")             self._proxy_task = asyncio.ensure_future(self._send_commands())             return          # saving response for later consumption in self._send_commands         self._response.put_nowait(data)      async def _send_commands(self):          while not self._shutdown.is_set():              print("Waiting for commands coming in ...")              command = None              # listen for commands coming in from the global command queue. Only blocking 1sec.             try:                 command = await asyncio.wait_for(cmd_queue.get(), timeout=1)             except asyncio.TimeoutError:                 continue              # sending the command over the pipe             self.transport.write(command)              # waiting for the response. Blocking until response is complete.             res = await self._response.get()             # put it into the global response queue             res_queue.put_nowait(res)   async def connect(loop):     c = lambda: DummyProtocol()     t = asyncio.Task(loop.create_connection(c, '192.168.1.143', '6600'))     try:         # Wait for 3 seconds, then raise TimeoutError         trans, proto = await asyncio.wait_for(t, timeout=3)         print("Connected to <192.168.1.143:6600>.")         return proto     except (asyncio.TimeoutError, OSError) as e:         print("Could not connect to <192.168.1.143:6600>. Trying again ...")         if isinstance(e, OSError):             log.exception(e)   def shutdown(proto, loop):     # http://stackoverflow.com/a/30766124/1230358     print("Shutdown of DummyProtocol initialized ...")     proto.close()     # give the coros time to finish     time.sleep(2)      # cancel all other tasks     # for task in asyncio.Task.all_tasks():     #    task.cancel()      # stopping the event loop     if loop:         print("Stopping event loop ...")         loop.stop()      print("Shutdown complete ...")       if __name__ == "__main__":      loop = asyncio.get_event_loop()      cmd_queue = asyncio.Queue()     res_queue = asyncio.Queue()      dummy_proto = loop.run_until_complete(connect(loop))      for signame in ('SIGINT','SIGTERM'):         loop.add_signal_handler(getattr(signal, signame), functools.partial(shutdown, dummy_proto, loop))      try:         loop.run_forever()     except KeyboardInterrupt:         pass     finally:         loop.close() 

what gives me the following output if CTRL-C is pressed:

Connected to <192.168.1.143:6600>. Ready for sending commands Waiting for commands coming in ... Waiting for commands coming in ... Waiting for commands coming in ... Waiting for commands coming in ... ^CShutdown of DummyProtocol initialized ... Closing protocol Stopping event loop ... Shutdown complete ... Task was destroyed but it is pending! task: <Task pending coro=<DummyProtocol._send_commands() running at ./dummy.py:45> wait_for=<Future pending cb=[Task._wakeup()]>> Task was destroyed but it is pending! task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/queues.py:168> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:344]> Exception ignored in: <generator object Queue.get at 0x10594b468> Traceback (most recent call last):   File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/queues.py", line 170, in get   File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 227, in cancel   File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks   File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon   File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon   File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed RuntimeError: Event loop is closed 

I'm not very experienced with asyncio, so I'm pretty sure that I'm missing something important here. What really gives me headaches is the part of the output after Shutdown complete .... Beginning with Task was destroyed but it is pending!, I have to admit that I have no idea what's going on. I had a look on other questions but couldn't get it to work. So, why is this code outputting stuff like Task was destroyed but it is pending! aso.and how can cleany close the coroutines?

Thank's for your help!

like image 635
hetsch Avatar asked May 24 '16 15:05

hetsch


People also ask

How do I stop Asyncio from running?

Run an asyncio Event Loop run_until_complete(<some Future object>) – this function runs a given Future object, usually a coroutine defined by the async / await pattern, until it's complete. run_forever() – this function runs the loop forever. stop() – the stop function stops a running loop.

How do you stop a running event loop?

Run the event loop until stop() is called. If stop() is called before run_forever() is called, the loop will poll the I/O selector once with a timeout of zero, run all callbacks scheduled in response to I/O events (and those that were already scheduled), and then exit.

Is coroutine deprecated?

coroutine was also deprecated in python 3.7 and scheduled for removal in python 3.10. It already issues a deprecation warning if used.


2 Answers

What does Task was destroyed but it is pending! mean?

If at the moment your program finished some of asyncio tasks still not finished, you'll get this warning. This warning is needed because some task that are running may not correctly free some resources.

There're two common ways to solve it:

  1. You can wait while tasks finished themselves
  2. You can cancel tasks and wait while them finished

Asyncio and blocking synchronous operations

Let's look at you code:

def shutdown(proto, loop):     print("Shutdown of DummyProtocol initialized ...")     proto.close()      time.sleep(2)     # ... 

time.sleep(2) - this line won't give coroutines time to finished. It'll just freeze all your program for two second. Nothing will happen during this time.

It happens because your event loop is running in same process where you call time.sleep(2). You should never call long running synchronous operations this way in your asyncio programs. Please read this answer to see how async code works.

How can we wait tasks finished

Let's try to modify shutdown function. This is not async function, you can't await something inside it. To execute some async code we need to do it manually: stop currently running loop (since it's already running), create some async function to await tasks finished, pass this function to be executed in event loop.

def shutdown(proto, loop):     print("Shutdown of DummyProtocol initialized ...")      # Set shutdown event:      proto.close()      # Stop loop:     loop.stop()      # Find all running tasks:     pending = asyncio.Task.all_tasks()      # Run loop until tasks done:     loop.run_until_complete(asyncio.gather(*pending))      print("Shutdown complete ...")     

You can also just cancel tasks and await them finished. See this answer for details.

Where to place clean up operations

I'm not too familiar with signals, but do you really need it to catch CTRL-C? Whenever KeyboardInterrupt happens it will thrown at the line where you run the event loop (in you code it's loop.run_forever()). I might be wrong here, but a common way to handle this situation is to place all cleanup operations in a finally block.

For example, you can see how aiohttp does it:

try:     loop.run_forever() except KeyboardInterrupt:  # pragma: no branch     pass finally:     srv.close()     loop.run_until_complete(srv.wait_closed())     loop.run_until_complete(app.shutdown())     loop.run_until_complete(handler.finish_connections(shutdown_timeout))     loop.run_until_complete(app.cleanup()) loop.close() 
like image 50
Mikhail Gerasimov Avatar answered Oct 11 '22 06:10

Mikhail Gerasimov


To complete the accepted answer, you can use aiorun which handles this problem for you pretty well: https://github.com/cjrh/aiorun

like image 36
mickours Avatar answered Oct 11 '22 06:10

mickours