Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to terminate loop.run_in_executor with ProcessPoolExecutor gracefully?

How to terminate loop.run_in_executor with ProcessPoolExecutor gracefully? Shortly after starting the program, SIGINT (ctrl + c) is sent.

def blocking_task():
    sleep(3)

async def main():
    exe = concurrent.futures.ProcessPoolExecutor(max_workers=4)
    loop = asyncio.get_event_loop()
    tasks = [loop.run_in_executor(exe, blocking_task) for i in range(3)]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('ctrl + c')

With max_workers equal or lesser than the the number of tasks everything works. But if max_workers is greater, the output of the above code is as follows:

Process ForkProcess-4:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 97, in get
    res = self._recv_bytes()
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
ctrl + c

I would like to catch the exception (KeyboardInterrupt) only once and ignore or mute the other exception(s) in the process pool, but how?


Update extra credit:

  • Can you explain (the reason for) the multi exception?
  • Does adding a signal handler work on Windows?
  • If not, is there a solution that works without a signal handler?
like image 299
linkjumper Avatar asked Sep 03 '20 17:09

linkjumper


1 Answers

You can use the initializer parameter of ProcessPoolExecutor to install a handler for SIGINT in each process.

Update: On Unix, when the process is created, it becomes a member of the process group of its parent. If you are generating the SIGINT with Ctrl+C, then the signal is being sent to the entire process group.

import asyncio
import concurrent.futures
import os
import signal
import sys
from time import sleep


def handler(signum, frame):
    print('SIGINT for PID=', os.getpid())
    sys.exit(0)


def init():    
    signal.signal(signal.SIGINT, handler)


def blocking_task():
    sleep(15)


async def main():
    exe = concurrent.futures.ProcessPoolExecutor(max_workers=5, initializer=init)
    loop = asyncio.get_event_loop()
    tasks = [loop.run_in_executor(exe, blocking_task) for i in range(2)]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('ctrl + c')

Ctrl-C shortly after start:

^CSIGINT for PID= 59942
SIGINT for PID= 59943
SIGINT for PID= 59941
SIGINT for PID= 59945
SIGINT for PID= 59944
ctrl + c
like image 97
alex_noname Avatar answered Oct 04 '22 13:10

alex_noname