Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apply reduce on generator output with multiprocessing

I have a generator function (Python) that works like this

def Mygenerator(x, y, z, ...):
    while True:
        # code that makes two matrices based on sequences of input arrays
        yield (matrix1, matrix2)

What I want to do is to add the output from this generator. This line does the job:

M1, M2 = reduce(lambda x, y: x[0] + y[0], x[1] + y[1], Mygenerator(x, y, z, ...))

I would like to parallelize this to speed up the computations. It is important that the outputs from Mygenerator is reduced as it is yielded, since list(Mygenerator(...)) would take too much memory.

like image 592
sulkeh Avatar asked Oct 20 '22 22:10

sulkeh


1 Answers

To answer my own question, I found a solution that seems to work as I had hoped:

First, Mygenerator is no longer a generator but a function. Also, instead of looping through segments of x, y and z, I now pass one segment to the function at the time:

def Myfunction(x_segment, y_segment, z_segment):
        # code that makes two matrices based on input arrays
        return (matrix1, matrix2)

Using multiprocessing.Pool with the imap (generator) function seems to work:

pool = multiprocessing.Pool(ncpus)
results = pool.imap(Myfunction, 
                    ( (x[i], y[i], z[i]) for i in range(len(x)) )
M1, M2 = reduce(lambda r1, r2: (r1[0] + r2[0], r1[1] + r2[1]), 
                    (result for result in results))
pool.close()
pool.join()

where I changed the x and y in the lambda expression to r1 and r2 to avoid confusion with the other variables with the same name. When trying to use a generator with multiprocessing I got some trouble with pickle.

The only disappointment with this solution is that it didn't really speed up the computations that much. I guess that has to do with overhead operations. When using 8 cores, the processing speed was increased by approximately 10%. When reducing to 4 cores the speed was doubled. This seems to be the best I can do with my particular task, unless there is some other way of doing the parallelizing...

The imap function was necessary to use here, since map would store all the returned values in memory before the reduce operation, and in this case that would not be possible.

like image 185
sulkeh Avatar answered Nov 11 '22 13:11

sulkeh