I'm setting up a multiprocessing module for the first time, and basically, I am planning to do something along the lines of
from multiprocessing import pool
pool = Pool(processes=102)
results = pool.map(whateverFunction, myIterable)
print 1
As I understand it, 1
will be printed as soon as all the processes have come back and results is complete. I would like to have some status update on these. What is the best way of implementing that?
I'm kind of hesitant of making whateverFunction()
print. Especially if there's around 200 values, I'm going to have something like 'process done' printed 200 times, which is not very useful.
I expect output like
10% of myIterable done
20% of myIterable done
pool.map
blocks until all the concurrent function calls have completed.
pool.apply_async
does not block. Moreover, you could use its callback
parameter
to report on progress. The callback function, log_result
, is called once each time foo
completes. It is passed the value returned by foo
.
from __future__ import division
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x
def log_result(retval):
results.append(retval)
if len(results) % (len(data)//10) == 0:
print('{:.0%} done'.format(len(results)/len(data)))
if __name__ == '__main__':
pool = mp.Pool()
results = []
data = range(200)
for item in data:
pool.apply_async(foo, args=[item], callback=log_result)
pool.close()
pool.join()
print(results)
yields
10% done
20% done
30% done
40% done
50% done
60% done
70% done
80% done
90% done
100% done
[0, 1, 2, 3, ..., 197, 198, 199]
The log_result
function above modifies the global variable results
and
accesses the global variable data
. You can not pass these variables to
log_result
because the callback function specified in pool.apply_async
is
always called with exactly one argument, the return value of foo
.
You can, however, make a closure, which at least makes clear what variables
log_result
depends on:
from __future__ import division
import multiprocessing as mp
import time
def foo(x):
time.sleep(0.1)
return x
def make_log_result(results, len_data):
def log_result(retval):
results.append(retval)
if len(results) % (len_data//10) == 0:
print('{:.0%} done'.format(len(results)/len_data))
return log_result
if __name__ == '__main__':
pool = mp.Pool()
results = []
data = range(200)
for item in data:
pool.apply_async(foo, args=[item], callback=make_log_result(results, len(data)))
pool.close()
pool.join()
print(results)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With