Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Proper way to shutdown asyncio tasks

I am writing a tool which connects to X number of UNIX sockets, sends a command and saves the output in the local file-system. It runs this every X seconds. In order to perform some cleanup when the tool receives termination signals I register a function(shutdown) to signal.SIGHUP and signal.SIGTERM signals. This function cancels all tasks and then closes the event loop.

My problem is that I get

RuntimeError: Event loop stopped before Future completed

when I send signal.SIGTERM(kill 'pid'). I have read the documentation about canceling tasks twice but I haven't spot what I am doing wrong here.

I also noticed something strange, when I send the termination signal the program is in sleep mode and I see in the log that it wakes up the pull_stats() coroutine, you can see this in th first 2 lines of the log.

Log:

21:53:44,194 [23031] [MainThread:supervisor  ] DEBUG    **sleeping for 9.805s secs** 21:53:45,857 [23031] [MainThread:pull_stats  ] INFO     pull statistics 21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin1.sock 21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin4.sock 21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin3.sock 21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin3.sock 21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin2.sock 21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin2.sock 21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin4.sock 21:53:45,859 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin1.sock 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     received stop signal, cancelling tasks... 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,860 [23031] [MainThread:shutdown    ] INFO     True 21:53:45,860 [23031] [MainThread:shutdown    ] INFO     stopping event loop 21:53:45,860 [23031] [MainThread:shutdown    ] INFO     bye, exiting... Traceback (most recent call last):   File "./pull.py", line 249, in <module>     main()   File "./pull.py", line 245, in main     supervisor(loop, config)   File "./pull.py", line 161, in supervisor     config['pull']['socket-dir'], storage_dir, loop))   File "/usr/lib/python3.4/asyncio/base_events.py", line 274, in run_until_complete     raise RuntimeError('Event loop stopped before Future completed.') RuntimeError: Event loop stopped before Future completed. 

Here is the code:

def shutdown(loop):     LOGGER.info('received stop signal, cancelling tasks...')     for task in asyncio.Task.all_tasks():         LOGGER.info(task.cancel())     LOGGER.info('stopping event loop')     loop.stop()     LOGGER.info('bye, exiting...')   def write_file(filename, data):     try:         with open(filename, 'w') as file_handle:             file_handle.write(data.decode())     except OSError as exc:         return False     else:         return True   @asyncio.coroutine def get(socket_file, cmd, storage_dir, loop):     connect = asyncio.open_unix_connection(socket_file)     reader, writer = yield from asyncio.wait_for(connect, 1)      writer.write('{c}\n'.format(c=cmd).encode())     data = yield from reader.read()     writer.close()      filename = os.path.basename(socket_file) + '_' + cmd.split()[1]     filename = os.path.join(storage_dir, filename)     result = yield from loop.run_in_executor(None, write_file, filename, data)      return result   @asyncio.coroutine def pull_stats(socket_dir, storage_dir, loop):     socket_files = glob.glob(socket_dir + '/*sock*')     coroutines = [get(socket_file, cmd, storage_dir, loop)                   for socket_file in socket_files                   for cmd in CMDS]     status = yield from asyncio.gather(*coroutines)      if len(set(status)) == 1 and True in set(status):         return True     else:         return False   def supervisor(loop, config):     dst_dir = config.get('pull', 'dst-dir')     tmp_dst_dir = config.get('pull', 'tmp-dst-dir')      while True:         start_time = int(time.time())         storage_dir = os.path.join(tmp_dst_dir, str(start_time))          try:             os.makedirs(storage_dir)         except OSError as exc:             msg = "failed to create directory {d}:{e}".format(d=storage_dir,                                                               e=exc)             LOGGER.critical(msg)          # Launch all connections.         result = loop.run_until_complete(pull_stats(             config['pull']['socket-dir'], storage_dir, loop))          if result:             try:                 shutil.move(storage_dir, dst_dir)             except OSError as exc:                 LOGGER.critical("failed to move %s to %s: %s", storage_dir,                                 dst_dir, exc)                 break             else:                 LOGGER.info('statistics are saved in %s', os.path.join(                     dst_dir, os.path.basename(storage_dir)))         else:             LOGGER.critical('failed to pull stats')             shutil.rmtree(storage_dir)          sleep = config.getint('pull', 'pull-interval') - (time.time() -                                                           start_time)         if 0 < sleep < config.getint('pull', 'pull-interval'):             time.sleep(sleep)     loop.close()     sys.exit(1)   def main():     args = docopt(__doc__, version=VERSION)     config = ConfigParser(interpolation=ExtendedInterpolation())     config.read_dict(copy.copy(DEFAULT_OPTIONS))     config.read(args['--file'])      loop = asyncio.get_event_loop()      loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop))     loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop))      num_level = getattr(logging, config.get('pull', 'loglevel').upper(), None)     LOGGER.setLevel(num_level)      supervisor(loop, config)  # This is the standard boilerplate that calls the main() function. if __name__ == '__main__':     main() 
like image 947
Pavlos Parissis Avatar asked Jan 10 '16 21:01

Pavlos Parissis


People also ask

How do I close all Asyncio tasks?

In order to perform some cleanup when the tool receives termination signals I register a function(shutdown) to signal. SIGHUP and signal. SIGTERM signals. This function cancels all tasks and then closes the event loop.

How do I stop Asyncio from running?

Run an asyncio Event Loop run_until_complete(<some Future object>) – this function runs a given Future object, usually a coroutine defined by the async / await pattern, until it's complete. run_forever() – this function runs the loop forever. stop() – the stop function stops a running loop.


1 Answers

The cancellation is not immediate and requires running ioloop to be resolved with exception CancelledError. Remove ioloop.stop from shutdown and handle exception in supervisor, to make things work. Below simplified example.

Important is, however you can cancel Task, it only stops watching/waiting for end/results and loop won't handle further events for it. But the underneath request/pipe will not be stopped.

Simplified example:

import asyncio import functools import logging import signal import sys from concurrent.futures import CancelledError   def shutdown(loop):     logging.info('received stop signal, cancelling tasks...')     for task in asyncio.Task.all_tasks():         task.cancel()     logging.info('bye, exiting in a minute...')       @asyncio.coroutine def get(i):     logging.info('sleep for %d', i)     yield from asyncio.sleep(i)       @asyncio.coroutine def pull_stats():     coroutines = [get(i) for i in range(10,20)]     status = yield from asyncio.gather(*coroutines)   def supervisor(loop):     try:         while True:             result = loop.run_until_complete(pull_stats())     except CancelledError:         logging.info('CancelledError')     loop.close()     sys.exit(1)   def main():     logging.getLogger().setLevel(logging.INFO)     loop = asyncio.get_event_loop()     loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop))     loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop))     supervisor(loop)   if __name__ == '__main__':     main() 

Note, that if you cancel only gather's Future, all children will be set as cancelled as well.

And the sleep thing

Any receipt of a signal or interrupt causes the program to resume execution. So when the process receive SIGTERM and handler is set, python allows you to handle it, to do this thread is resumed and sighandler is called. Due to implementation of ioloop and its signal handling, it keeps running after wake.

like image 105
kwarunek Avatar answered Oct 14 '22 21:10

kwarunek