Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Launch concurrent.futures.ProcessPoolExecutor with initialization?

I'm planning to use concurrent.futures.ProcessPoolExecutor to parallelize execution of functions. According to the documentation, its executor object can only accept a simple function in map. My actual situation involves initialization (loading of data) prior to execution of the 'to-be-parallelized' function. How do I arrange that?

The 'to-be-parallelized' function is called in an iteration for many times. I don't want it to be re-initialized each time.

In other words, there's an init function that produces some output to this tbp function. Each child should have its own copy of that output, because the function depended on that.

like image 549
He Shiming Avatar asked Aug 21 '15 01:08

He Shiming


People also ask

What are concurrent futures?

The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .

How does concurrent futures ThreadPoolExecutor work?

ThreadPoolExecutor Methods : submit(fn, *args, **kwargs): It runs a callable or a method and returns a Future object representing the execution state of the method. map(fn, *iterables, timeout = None, chunksize = 1) : It maps the method and iterables together immediately and will raise an exception concurrent. futures.


2 Answers

If using Python 3.7 or above, use RuRo's answer below. This answer is only relevant for earlier Python releases where concurrent.futures did not have support for passing an initializer function.


It sounds like you're looking for an equivalent to the initializer/initargs options that multiprocessing.Pool takes. Currently, that behavior doesn't exist for concurrent.futures.ProcessPoolExecutor, though there is a patch waiting for review that adds that behavior.

So, you can either use multiprocessing.Pool (which might be fine for your usecase), wait for that patch to get merged and released (you might be waiting a while :)), or roll your own solution. Turns out, it's not too hard to write a wrapper function for map that takes an initializer, but only calls it one per process:

from concurrent.futures import ProcessPoolExecutor
from functools import partial

inited = False
initresult = None

def initwrapper(initfunc, initargs, f, x):
    # This will be called in the child. inited
    # Will be False the first time its called, but then
    # remain True every other time its called in a given
    # worker process.
    global inited, initresult
    if not inited:
        inited = True
        initresult = initfunc(*initargs)
    return f(x)

def do_init(a,b):
    print('ran init {} {}'.format(a,b))
    return os.getpid() # Just to demonstrate it will be unique per process

def f(x):
    print("Hey there {}".format(x))
    print('initresult is {}'.format(initresult))
    return x+1

def initmap(executor, initializer, initargs, f, it):
    return executor.map(partial(initwrapper, initializer, initargs, f), it)


if __name__ == "__main__":
    with ProcessPoolExecutor(4) as executor:
        out = initmap(executor, do_init, (5,6), f, range(10))
    print(list(out))

Output:

ran init 5 6
Hey there 0
initresult is 4568
ran init 5 6
Hey there 1
initresult is 4569
ran init 5 6
Hey there 2
initresult is 4570
Hey there 3
initresult is 4569
Hey there 4
initresult is 4568
ran init 5 6
Hey there 5
initresult is 4571
Hey there 6
initresult is 4570
Hey there 7
initresult is 4569
Hey there 8
initresult is 4568
Hey there 9
initresult is 4570
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
like image 120
dano Avatar answered Oct 21 '22 18:10

dano


As of Python 3.7, both the ThreadPoolExecutor and the ProcessPoolExecutor have the optional initializer and initargs arguments. Each thread/process will call initializer(*initargs) after starting.

See https://docs.python.org/3.7/library/concurrent.futures.html .

like image 31
RuRo Avatar answered Oct 21 '22 19:10

RuRo