I am writing a tool which connects to X number of UNIX sockets, sends a command and saves the output in the local file-system. It runs this every X seconds. In order to perform some cleanup when the tool receives termination signals I register a function(shutdown) to signal.SIGHUP and signal.SIGTERM signals. This function cancels all tasks and then closes the event loop.
My problem is that I get
RuntimeError: Event loop stopped before Future completed
when I send signal.SIGTERM(kill 'pid'). I have read the documentation about canceling tasks twice but I haven't spot what I am doing wrong here.
I also noticed something strange, when I send the termination signal the program is in sleep mode and I see in the log that it wakes up the pull_stats() coroutine, you can see this in th first 2 lines of the log.
Log:
21:53:44,194 [23031] [MainThread:supervisor ] DEBUG **sleeping for 9.805s secs** 21:53:45,857 [23031] [MainThread:pull_stats ] INFO pull statistics 21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock 21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock 21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock 21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock 21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock 21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock 21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock 21:53:45,859 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock 21:53:45,859 [23031] [MainThread:shutdown ] INFO received stop signal, cancelling tasks... 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,859 [23031] [MainThread:shutdown ] INFO True 21:53:45,860 [23031] [MainThread:shutdown ] INFO True 21:53:45,860 [23031] [MainThread:shutdown ] INFO stopping event loop 21:53:45,860 [23031] [MainThread:shutdown ] INFO bye, exiting... Traceback (most recent call last): File "./pull.py", line 249, in <module> main() File "./pull.py", line 245, in main supervisor(loop, config) File "./pull.py", line 161, in supervisor config['pull']['socket-dir'], storage_dir, loop)) File "/usr/lib/python3.4/asyncio/base_events.py", line 274, in run_until_complete raise RuntimeError('Event loop stopped before Future completed.') RuntimeError: Event loop stopped before Future completed.
Here is the code:
def shutdown(loop): LOGGER.info('received stop signal, cancelling tasks...') for task in asyncio.Task.all_tasks(): LOGGER.info(task.cancel()) LOGGER.info('stopping event loop') loop.stop() LOGGER.info('bye, exiting...') def write_file(filename, data): try: with open(filename, 'w') as file_handle: file_handle.write(data.decode()) except OSError as exc: return False else: return True @asyncio.coroutine def get(socket_file, cmd, storage_dir, loop): connect = asyncio.open_unix_connection(socket_file) reader, writer = yield from asyncio.wait_for(connect, 1) writer.write('{c}\n'.format(c=cmd).encode()) data = yield from reader.read() writer.close() filename = os.path.basename(socket_file) + '_' + cmd.split()[1] filename = os.path.join(storage_dir, filename) result = yield from loop.run_in_executor(None, write_file, filename, data) return result @asyncio.coroutine def pull_stats(socket_dir, storage_dir, loop): socket_files = glob.glob(socket_dir + '/*sock*') coroutines = [get(socket_file, cmd, storage_dir, loop) for socket_file in socket_files for cmd in CMDS] status = yield from asyncio.gather(*coroutines) if len(set(status)) == 1 and True in set(status): return True else: return False def supervisor(loop, config): dst_dir = config.get('pull', 'dst-dir') tmp_dst_dir = config.get('pull', 'tmp-dst-dir') while True: start_time = int(time.time()) storage_dir = os.path.join(tmp_dst_dir, str(start_time)) try: os.makedirs(storage_dir) except OSError as exc: msg = "failed to create directory {d}:{e}".format(d=storage_dir, e=exc) LOGGER.critical(msg) # Launch all connections. result = loop.run_until_complete(pull_stats( config['pull']['socket-dir'], storage_dir, loop)) if result: try: shutil.move(storage_dir, dst_dir) except OSError as exc: LOGGER.critical("failed to move %s to %s: %s", storage_dir, dst_dir, exc) break else: LOGGER.info('statistics are saved in %s', os.path.join( dst_dir, os.path.basename(storage_dir))) else: LOGGER.critical('failed to pull stats') shutil.rmtree(storage_dir) sleep = config.getint('pull', 'pull-interval') - (time.time() - start_time) if 0 < sleep < config.getint('pull', 'pull-interval'): time.sleep(sleep) loop.close() sys.exit(1) def main(): args = docopt(__doc__, version=VERSION) config = ConfigParser(interpolation=ExtendedInterpolation()) config.read_dict(copy.copy(DEFAULT_OPTIONS)) config.read(args['--file']) loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop)) loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop)) num_level = getattr(logging, config.get('pull', 'loglevel').upper(), None) LOGGER.setLevel(num_level) supervisor(loop, config) # This is the standard boilerplate that calls the main() function. if __name__ == '__main__': main()
In order to perform some cleanup when the tool receives termination signals I register a function(shutdown) to signal. SIGHUP and signal. SIGTERM signals. This function cancels all tasks and then closes the event loop.
Run an asyncio Event Loop run_until_complete(<some Future object>) – this function runs a given Future object, usually a coroutine defined by the async / await pattern, until it's complete. run_forever() – this function runs the loop forever. stop() – the stop function stops a running loop.
The cancellation is not immediate and requires running ioloop to be resolved with exception CancelledError
. Remove ioloop.stop
from shutdown and handle exception in supervisor, to make things work. Below simplified example.
Important is, however you can cancel Task
, it only stops watching/waiting for end/results and loop won't handle further events for it. But the underneath request/pipe will not be stopped.
Simplified example:
import asyncio import functools import logging import signal import sys from concurrent.futures import CancelledError def shutdown(loop): logging.info('received stop signal, cancelling tasks...') for task in asyncio.Task.all_tasks(): task.cancel() logging.info('bye, exiting in a minute...') @asyncio.coroutine def get(i): logging.info('sleep for %d', i) yield from asyncio.sleep(i) @asyncio.coroutine def pull_stats(): coroutines = [get(i) for i in range(10,20)] status = yield from asyncio.gather(*coroutines) def supervisor(loop): try: while True: result = loop.run_until_complete(pull_stats()) except CancelledError: logging.info('CancelledError') loop.close() sys.exit(1) def main(): logging.getLogger().setLevel(logging.INFO) loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop)) loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop)) supervisor(loop) if __name__ == '__main__': main()
Note, that if you cancel only gather's
Future, all children will be set as cancelled as well.
And the sleep thing
Any receipt of a signal or interrupt causes the program to resume execution. So when the process receive SIGTERM and handler is set, python allows you to handle it, to do this thread is resumed and sighandler is called. Due to implementation of ioloop and its signal handling, it keeps running after wake.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With