Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiprocessing pool 'apply_async' only seems to call function once

I've been following the docs to try to understand multiprocessing pools. I came up with this:

import time
from multiprocessing import Pool

def f(a):
    print 'f(' + str(a) + ')'
    return True

t = time.time()
pool = Pool(processes=10)
result = pool.apply_async(f, (1,))
print result.get()
pool.close()
print ' [i] Time elapsed ' + str(time.time() - t)

I'm trying to use 10 processes to evaluate the function f(a). I've put a print statement in f.

This is the output I'm getting:

$ python pooltest.py 
f(1)
True
 [i] Time elapsed 0.0270888805389

It appears to me that the function f is only getting evaluated once.

I'm likely not using the right method but the end result I'm looking for is to run f with 10 processes simultaneously, and get the result returned by each one of those process. So I would end with a list of 10 results (which may or may not be identical).

The docs on multiprocessing are quite confusing and it's not trivial to figure out which approach I should be taking and it seems to me that f should be run 10 times in the example I provided above.

like image 899
Juicy Avatar asked Feb 09 '15 22:02

Juicy


3 Answers

apply_async isn't meant to launch multiple processes; it's just meant to call the function with the arguments in one of the processes of the pool. You'll need to make 10 calls if you want the function to be called 10 times.

First, note the docs on apply() (emphasis added):

apply(func[, args[, kwds]])

Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks, apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

Now, in the docs for apply_async():

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

A variant of the apply() method which returns a result object.

The difference between the two is just that apply_async returns immediately. You can use map() to call a function multiple times, though if you're calling with the same inputs, then it's a little redudant to create the list of the same argument just to have a sequence of the right length.

However, if you're calling different functions with the same input, then you're really just calling a higher order function, and you could do it with map or map_async() like this:

multiprocessing.map(lambda f: f(1), functions)

except that lambda functions aren't pickleable, so you'd need to use a defined function (see How to let Pool.map take a lambda function). You can actually use the builtin apply() (not the multiprocessing one) (although it's deprecated):

multiprocessing.map(apply,[(f,1) for f in functions])

It's easy enough to write your own, too:

def apply_(f,*args,**kwargs):
  return f(*args,**kwargs)

multiprocessing.map(apply_,[(f,1) for f in functions])
like image 181
Joshua Taylor Avatar answered Nov 10 '22 03:11

Joshua Taylor


Each time you write pool.apply_async(...) it will delegate that function call to one of the processes that was started in the pool. If you want to call the function in multiple processes, you need to issue multiple pool.apply_async calls.

Note, there also exists a pool.map (and pool.map_async) function which will take a function and an iterable of inputs:

inputs = range(30)
results = pool.map(f, inputs)

These functions will apply the function to each input in the inputs iterable. It attempts to put "batches" into the pool so that the load gets balanced fairly evenly among all the processes in the pool.

like image 23
mgilson Avatar answered Nov 10 '22 04:11

mgilson


If you want to run a single piece of code in ten processes, each of which then exits, a Pool of ten processes is probably not the right thing to use.

Instead, create ten Processes to run the code:

processes = []

for _ in range(10):
    p = multiprocessing.Process(target=f, args=(1,))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

The multiprocessing.Pool class is designed to handle situations where the number of processes and the number of jobs are unrelated. Often the number of processes is selected to be the number of CPU cores you have, while the number of jobs is much larger. Thanks!

like image 1
Blckknght Avatar answered Nov 10 '22 04:11

Blckknght