Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to keep track of status with multiprocessing and pool.map?

I'm setting up a multiprocessing module for the first time, and basically, I am planning to do something along the lines of

from multiprocessing import pool
pool = Pool(processes=102)
results = pool.map(whateverFunction, myIterable)
print 1

As I understand it, 1 will be printed as soon as all the processes have come back and results is complete. I would like to have some status update on these. What is the best way of implementing that?

I'm kind of hesitant of making whateverFunction() print. Especially if there's around 200 values, I'm going to have something like 'process done' printed 200 times, which is not very useful.

I expect output like

10% of myIterable done
20% of myIterable done
like image 649
FooBar Avatar asked Jan 16 '16 13:01

FooBar


1 Answers

pool.map blocks until all the concurrent function calls have completed. pool.apply_async does not block. Moreover, you could use its callback parameter to report on progress. The callback function, log_result, is called once each time foo completes. It is passed the value returned by foo.

from __future__ import division
import multiprocessing as mp
import time

def foo(x):
    time.sleep(0.1)
    return x

def log_result(retval):
    results.append(retval)
    if len(results) % (len(data)//10) == 0:
        print('{:.0%} done'.format(len(results)/len(data)))

if __name__ == '__main__':
    pool = mp.Pool()
    results = []
    data = range(200)
    for item in data:
        pool.apply_async(foo, args=[item], callback=log_result)
    pool.close()
    pool.join()
    print(results)

yields

10% done
20% done
30% done
40% done
50% done
60% done
70% done
80% done
90% done
100% done
[0, 1, 2, 3, ..., 197, 198, 199]

The log_result function above modifies the global variable results and accesses the global variable data. You can not pass these variables to log_result because the callback function specified in pool.apply_async is always called with exactly one argument, the return value of foo.

You can, however, make a closure, which at least makes clear what variables log_result depends on:

from __future__ import division
import multiprocessing as mp
import time

def foo(x):
    time.sleep(0.1)
    return x

def make_log_result(results, len_data):
    def log_result(retval):
        results.append(retval)
        if len(results) % (len_data//10) == 0:
            print('{:.0%} done'.format(len(results)/len_data))
    return log_result

if __name__ == '__main__':
    pool = mp.Pool()
    results = []
    data = range(200)
    for item in data:
        pool.apply_async(foo, args=[item], callback=make_log_result(results, len(data)))
    pool.close()
    pool.join()
    print(results)
like image 195
unutbu Avatar answered Oct 05 '22 23:10

unutbu