Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Python multiprocessing Pool.map to fill numpy array in a for loop

I want to fill a 2D-numpy array within a for loop and fasten the calculation by using multiprocessing.

import numpy
from multiprocessing import Pool


array_2D = numpy.zeros((20,10))
pool = Pool(processes = 4)

def fill_array(start_val):
    return range(start_val,start_val+10)

list_start_vals = range(40,60)
for line in xrange(20):
    array_2D[line,:] = pool.map(fill_array,list_start_vals)
pool.close()

print array_2D

The effect of executing it is that Python runs 4 subprocesses and occupies 4 CPU cores BUT the execution doesn´t finish and the array is not printed. If I try to write the array to the disk, nothing happens.

Can anyone tell me why?

like image 607
MoTSCHIGGE Avatar asked Sep 17 '14 10:09

MoTSCHIGGE


2 Answers

The following works. First it is a good idea to protect the main part of your code inside a main block in order to avoid weird side effects. The result of poo.map() is a list containing the evaluations for each value in the iterator list_start_vals, such that you don't have to create array_2D before.

import numpy as np
from multiprocessing import Pool

def fill_array(start_val):
    return list(range(start_val, start_val+10))

if __name__=='__main__':
    pool = Pool(processes=4)
    list_start_vals = range(40, 60)
    array_2D = np.array(pool.map(fill_array, list_start_vals))
    pool.close() # ATTENTION HERE
    print array_2D

perhaps you will have trouble using pool.close(), from the comments of @hpaulj you can just remove this line in case you have problems...

like image 193
Saullo G. P. Castro Avatar answered Oct 14 '22 15:10

Saullo G. P. Castro


If you still want to use the array fill, you can use pool.apply_async instead of pool.map. Working from Saullo's answer:

import numpy as np
from multiprocessing import Pool

def fill_array(start_val):
    return range(start_val, start_val+10)

if __name__=='__main__':
    pool = Pool(processes=4)
    list_start_vals = range(40, 60)
    array_2D = np.zeros((20,10))
    for line, val in enumerate(list_start_vals):
        result = pool.apply_async(fill_array, [val])
        array_2D[line,:] = result.get()
    pool.close()
    print array_2D

This runs a bit slower than the map. But it does not produce a runtime error like my test of the map version: Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored

like image 34
hpaulj Avatar answered Oct 14 '22 14:10

hpaulj