Context:
concurrent.futures.process.ProcessPool
to execute code(yes I know importlib.reload
has caveats)
To get this to work I imagine I would have to execute the importlib.reload
in every multiprocessing
process that is managed by the process pool.
Is there a way to submit something to all processes in a process pool?
The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .
ThreadPoolExecutor Methods : submit(fn, *args, **kwargs): It runs a callable or a method and returns a Future object representing the execution state of the method. map(fn, *iterables, timeout = None, chunksize = 1) : It maps the method and iterables together immediately and will raise an exception concurrent. futures.
I don't know how this will play out with the hot reloading attempt you mentioned, but the general question you really asked is answerable.
Is there a way to submit something to all processes in a process pool?
The challenge here lies in assuring that really all processes get this something
once and only once and no further execution takes place until every process got it.
You can get this type of necessary synchronization with help of a multiprocessing.Barrier(parties[, action[, timeout]])
. The barrier will hold back parties calling barrier.wait()
until every party has done so and then release them all at once.
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
def foo(x):
for _ in range(int(42e4)):
pass
return x
def reload(something):
print(f"{mp.current_process().name} --- reloading {something} and waiting.")
barrier.wait()
print(f"{mp.current_process().name} --- released.")
def init_barrier(barrier):
globals()['barrier'] = barrier
if __name__ == '__main__':
MAX_WORKERS = 4
barrier = mp.Barrier(MAX_WORKERS)
with ProcessPoolExecutor(
MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
) as executor:
print(list(executor.map(foo, range(10))))
# then something for all processes
futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
for f in futures:
f.result()
print(list(executor.map(foo, range(10))))
Example Output:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ForkProcess-3 --- reloading something and waiting.
ForkProcess-2 --- reloading something and waiting.
ForkProcess-1 --- reloading something and waiting.
ForkProcess-4 --- reloading something and waiting.
ForkProcess-1 --- released.
ForkProcess-4 --- released.
ForkProcess-3 --- released.
ForkProcess-2 --- released.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Process finished with exit code 0
If you are okay with keeping barrier
a global and multiprocessing.get_context()._name
returns "fork"
, you don't need to use the initializer
because globals will be inherited and accessible through forking.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With