Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Submit code for execution to all processes in a concurrent.futures.ProcessPool

Context:

  • A Python application server that uses a concurrent.futures.process.ProcessPool to execute code
  • We sometimes want to hot reload imported code without restarting the entire server process

(yes I know importlib.reload has caveats)

To get this to work I imagine I would have to execute the importlib.reload in every multiprocessing process that is managed by the process pool.

Is there a way to submit something to all processes in a process pool?

like image 814
codeape Avatar asked Oct 16 '20 16:10

codeape


People also ask

What are concurrent futures?

The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .

How does concurrent futures ThreadPoolExecutor work?

ThreadPoolExecutor Methods : submit(fn, *args, **kwargs): It runs a callable or a method and returns a Future object representing the execution state of the method. map(fn, *iterables, timeout = None, chunksize = 1) : It maps the method and iterables together immediately and will raise an exception concurrent. futures.


1 Answers

I don't know how this will play out with the hot reloading attempt you mentioned, but the general question you really asked is answerable.

Is there a way to submit something to all processes in a process pool?

The challenge here lies in assuring that really all processes get this something once and only once and no further execution takes place until every process got it.

You can get this type of necessary synchronization with help of a multiprocessing.Barrier(parties[, action[, timeout]]). The barrier will hold back parties calling barrier.wait() until every party has done so and then release them all at once.

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor


def foo(x):
    for _ in range(int(42e4)):
        pass
    return x


def reload(something):
    print(f"{mp.current_process().name} --- reloading {something} and waiting.")
    barrier.wait()
    print(f"{mp.current_process().name} --- released.")


def init_barrier(barrier):
    globals()['barrier'] = barrier


if __name__ == '__main__':

    MAX_WORKERS = 4
    barrier = mp.Barrier(MAX_WORKERS)

    with ProcessPoolExecutor(
            MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
    ) as executor:
        print(list(executor.map(foo, range(10))))
        # then something for all processes
        futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
        for f in futures:
            f.result()

        print(list(executor.map(foo, range(10))))

Example Output:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ForkProcess-3 --- reloading something and waiting.
ForkProcess-2 --- reloading something and waiting.
ForkProcess-1 --- reloading something and waiting.
ForkProcess-4 --- reloading something and waiting.
ForkProcess-1 --- released.
ForkProcess-4 --- released.
ForkProcess-3 --- released.
ForkProcess-2 --- released.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Process finished with exit code 0

If you are okay with keeping barrier a global and multiprocessing.get_context()._name returns "fork", you don't need to use the initializer because globals will be inherited and accessible through forking.

like image 156
Darkonaut Avatar answered Oct 23 '22 09:10

Darkonaut