Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make tasks in ProcessPoolExecutor behave like daemon process?

Python 3.6.6

Here is code:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor


executor_processes = ProcessPoolExecutor(2)


def calculate():
    while True:
        print("while")
        time.sleep(1)


async def async_method():
    loop_ = asyncio.get_event_loop()
    loop_.run_in_executor(executor_processes, calculate)
    await asyncio.sleep(1)
    print("finish sleep")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_method())
    print("main_thread is finished")

Output:

while
finish sleep
main_thread is finished
while
while
...

I expect that child process will be terminated like in case when Process is spawned with daemon property like:

import asyncio
import time
import multiprocessing


def calculate():
    while True:
        print("while")
        time.sleep(1)


async def async_method():
    proc = multiprocessing.Process(target=calculate)
    proc.daemon = True
    proc.start()
    await asyncio.sleep(1)
    print("finish sleep")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_method())
    print("main_thread is finished")

Output:

while
finish sleep
main_thread is finished

Question: how to change loop_.run_in_executor(executor_processes, calculate) behavior to "daemon-like"?

like image 680
Yuriy Leonov Avatar asked Sep 20 '25 06:09

Yuriy Leonov


1 Answers

The code you show is clearly just a small example to demonstrate what you are hoping to achieve. We don't know your real-world task/problem. But honestly, I'm not convinced you are on the correct path here.

ProcessPoolExecutor is part of the concurrent.futures standard library package. It returns a Future to the caller upon invoking submit(). That Future is a proxy for a result of a computation that has not yet completed. It's a promise; although that term is technically not quite correct in this context. See the Wiki page for the distinction.

This implies, that the computation is expected to complete in finite time and yield a result.
That is why the ThreadPoolExecutor and ProcessPoolExecutor implementations in Python don't allow you to spawn daemonic workers. Demanding a promise for a result that you don't actually want fulfilled does not make much sense.

How can you still achieve your goal?

1 - Subclass ProcessPoolExecutor?
You can intercept the creation and start of new processes to sneak in a p.daemon = True in _adjust_process_count(). However, since concurrent.futures is not designed with indefinitely running tasks in mind, this won't help much. Unlike multiprocessing, concurrent.futures.process defines an exit handler that doesn't consider daemonic processes. It just tries to join() everything, and that can take some time for infinite loops.

2 - Define your own exit handler!
You could do what both, multiprocessing and concurrent.futures.process do: define an exit handler, that cleans up when your Python process is about to shut down. atexit can help with that:

import atexit

executor_processes = ProcessPoolExecutor(2)

def calculate():
    while True:
        print("while")
        time.sleep(1)

def end_processes():
    [proc.terminate() for proc in multiprocessing.active_children()]

async def async_method():
    [...]

if __name__ == '__main__':
    atexit.register(end_processes)
    loop = asyncio.get_event_loop()
    [...]

Note: This will terminate all child processes that are active by the end of the process. If there are child processes that you want to shut down gracefully, keep a handle and do that before the instructions in your code end.
Also note that processes can refuse to honor terminate(). kill() is your last resort.

like image 119
shmee Avatar answered Sep 22 '25 19:09

shmee