Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to shutdown process with event loop and executor

Consider the following program.

import asyncio
import multiprocessing
from multiprocessing import Queue
from concurrent.futures.thread import ThreadPoolExecutor
import sys


def main():
    executor = ThreadPoolExecutor()
    loop = asyncio.get_event_loop()
    # comment the following line and the shutdown will work smoothly
    asyncio.ensure_future(print_some(executor))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        print("shutting down")
        executor.shutdown()
        loop.stop()
        loop.close()
        sys.exit()


async def print_some(executor):
    print("Waiting...Hit CTRL+C to abort")
    queue = Queue()
    loop = asyncio.get_event_loop()
    some = await loop.run_in_executor(executor, queue.get)
    print(some)


if __name__ == '__main__':
    main()

All I want is a graceful shutdown when I hit "CTRL+C". However, the executor thread seems to prevent that (even though I do call shutdown)

like image 661
Christoph Avatar asked Aug 29 '18 15:08

Christoph


1 Answers

You need to send a poison pill to make the workers stop listening on the queue.get call. Worker threads in the ThreadPoolExecutor pool will block Python from exiting if they have active work. There's a comment in the source code that describes the reasoning for this behavior:

# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
# pool (i.e. shutdown() was not called). However, allowing workers to die with
# the interpreter has two undesirable properties:
#   - The workers would still be running during interpreter shutdown,
#     meaning that they would fail in unpredictable ways.
#   - The workers could be killed while evaluating a work item, which could
#     be bad if the callable being evaluated has external side-effects e.g.
#     writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.

Here's a complete example that exits cleanly:

import asyncio
import multiprocessing
from multiprocessing import Queue
from concurrent.futures.thread import ThreadPoolExecutor
import sys


def main():
    executor = ThreadPoolExecutor()
    loop = asyncio.get_event_loop()
    # comment the following line and the shutdown will work smoothly
    fut = asyncio.ensure_future(print_some(executor))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        print("shutting down")
        queue.put(None)  # Poison pill
        loop.run_until_complete(fut)
        executor.shutdown()
        loop.stop()
        loop.close()


async def print_some(executor):
    print("Waiting...Hit CTRL+C to abort")
    loop = asyncio.get_event_loop()
    some = await loop.run_in_executor(executor, queue.get)
    print(some)


queue = None
if __name__ == '__main__':
    queue = Queue()
    main()

The run_until_complete(fut) call is needed to avoid a warning about a pending task hanging around when the asyncio eventloop exits. If you don't care about that, you can leave that call out.

like image 171
dano Avatar answered Nov 10 '22 00:11

dano