Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combining itertools and multiprocessing?

I have a 256x256x256 Numpy array, in which each element is a matrix. I need to do some calculations on each of these matrices, and I want to use the multiprocessing module to speed things up.

The results of these calculations must be stored in a 256x256x256 array like the original one, so that the result of the matrix at element [i,j,k] in the original array must be put in the [i,j,k] element of the new array.

To do this, I want to make a list which could be written in a pseudo-ish way as [array[i,j,k], (i, j, k)] and pass it to a function to be "multiprocessed". Assuming that matrices is a list of all the matrices extracted from the original array and myfunc is the function doing the calculations, the code would look somewhat like this:

import multiprocessing
import numpy as np
from itertools import izip

def myfunc(finput):
    # Do some calculations...
    ...

    # ... and return the result and the index:
    return (result, finput[1])

# Make indices:
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3)

# Make function input from the matrices and the indices:
finput = izip(matrices, inds)

pool = multiprocessing.Pool()
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999))

However, it seems like map_async is actually creating this huge finput-list first: My CPU's aren't doing much, but the memory and swap get completely consumed in a matter of seconds, which is obviously not what I want.

Is there a way to pass this huge list to a multiprocessing function without the need to explicitly create it first? Or do you know another way of solving this problem?

Thanks a bunch! :-)

like image 663
digitaldingo Avatar asked Sep 05 '11 10:09

digitaldingo


2 Answers

All multiprocessing.Pool.map* methods consume iterators fully(demo code) as soon as the function is called. To feed the map function chunks of the iterator one chunk at a time, use grouper_nofill:

def grouper_nofill(n, iterable):
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']]
    '''
    it=iter(iterable)
    def take():
        while 1: yield list(itertools.islice(it,n))
    return iter(take().next,[])

chunksize=256
async_results=[]
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)):
    async_results.extend(pool.map_async(myfunc, finput).get())
async_results=np.array(async_results)

PS. pool.map_async's chunksize parameter does something different: It breaks the iterable into chunks, then gives each chunk to a worker process which calls map(func,chunk). This can give the worker process more data to chew on if func(item) finishes too quickly, but it does not help in your situation since the iterator still gets consumed fully immediately after the map_async call is issued.

like image 60
unutbu Avatar answered Oct 30 '22 13:10

unutbu


I ran into this problem as well. instead of this:

res = p.map(func, combinations(arr, select_n))

do

res = p.imap(func, combinations(arr, select_n))

imap doesn't consume it!

like image 27
MutantTurkey Avatar answered Oct 30 '22 13:10

MutantTurkey