I have a 256x256x256
Numpy array, in which each element is a matrix. I need to do some calculations on each of these matrices, and I want to use the multiprocessing
module to speed things up.
The results of these calculations must be stored in a 256x256x256
array like the original one, so that the result of the matrix at element [i,j,k]
in the original array must be put in the [i,j,k]
element of the new array.
To do this, I want to make a list which could be written in a pseudo-ish way as [array[i,j,k], (i, j, k)]
and pass it to a function to be "multiprocessed".
Assuming that matrices
is a list of all the matrices extracted from the original array and myfunc
is the function doing the calculations, the code would look somewhat like this:
import multiprocessing
import numpy as np
from itertools import izip
def myfunc(finput):
# Do some calculations...
...
# ... and return the result and the index:
return (result, finput[1])
# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)
# Make function input from the matrices and the indices:
finput = izip(matrices, inds)
pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))
However, it seems like map_async
is actually creating this huge finput
-list first: My CPU's aren't doing much, but the memory and swap get completely consumed in a matter of seconds, which is obviously not what I want.
Is there a way to pass this huge list to a multiprocessing function without the need to explicitly create it first? Or do you know another way of solving this problem?
Thanks a bunch! :-)
All multiprocessing.Pool.map*
methods consume iterators fully(demo code) as soon as the function is called. To feed the map function chunks of the iterator one chunk at a time, use grouper_nofill
:
def grouper_nofill(n, iterable):
'''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
'''
it=iter(iterable)
def take():
while 1: yield list(itertools.islice(it,n))
return iter(take().next,[])
chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)
PS. pool.map_async
's chunksize
parameter does something different: It breaks the iterable into chunks, then gives each chunk to a worker process which calls map(func,chunk)
. This can give the worker process more data to chew on if func(item)
finishes too quickly, but it does not help in your situation since the iterator still gets consumed fully immediately after the map_async
call is issued.
I ran into this problem as well. instead of this:
res = p.map(func, combinations(arr, select_n))
do
res = p.imap(func, combinations(arr, select_n))
imap doesn't consume it!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With