Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use multi-processing/threading to break numpy array operation into chunks

I have a function defined which renders a MxN array. The array is very huge hence I want to use the function to produce small arrays (M1xN, M2xN, M3xN --- MixN. M1+M2+M3+---+Mi = M) simultaneously using multi-processing/threading and eventually join these arrays to form mxn array. As Mr. Boardrider rightfully suggested to provide a viable example, following example would broadly convey what I intend to do

import numpy as n
def mult(y,x):
    r = n.empty([len(y),len(x)])
    for i in range(len(r)):
        r[i] = y[i]*x
    return r
x = n.random.rand(10000)
y = n.arange(0,100000,1)
test = mult(y=y,x=x)

As the lengths of x and y increase the system will take more and more time. With respect to this example, I want to run this code such that if I have 4 cores, I can give quarter of the job to each, i.e give job to compute elements r[0] to r[24999] to the 1st core, r[25000] to r[49999] to the 2nd core, r[50000] to r[74999] to the 3rd core and r[75000] to r[99999] to the 4th core. Eventually club the results, append them to get one single array r[0] to r[99999].

I hope this example makes things clear. If my problem is still not clear, please tell.

like image 929
Ishan Tomar Avatar asked Sep 21 '16 08:09

Ishan Tomar


1 Answers

The first thing to say is: if it's about multiple cores on the same processor, numpy is already capable of parallelizing the operation better than we could ever do by hand (see the discussion at multiplication of large arrays in python )

In this case the key would be simply to ensure that the multiplication is all done in a wholesale array operation rather than a Python for-loop:

test2 = x[n.newaxis, :] * y[:, n.newaxis]

n.abs( test - test2 ).max()  # verify equivalence to mult(): output should be 0.0, or very small reflecting floating-point precision limitations

[If you actually wanted to spread this across multiple separate CPUs, that's a different matter, but the question seems to suggest a single (multi-core) CPU.]


OK, bearing the above in mind: let's suppose you want to parallelize an operation more complicated than just mult(). Let's assume you've tried hard to optimize your operation into wholesale array operations that numpy can parallelize itself, but your operation just isn't susceptible to this. In that case, you can use a shared-memory multiprocessing.Array created with lock=False, and multiprocessing.Pool to assign processes to address non-overlapping chunks of it, divided up over the y dimension (and also simultaneously over x if you want). An example listing is provided below. Note that this approach does not explicitly do exactly what you specify (club the results together and append them into a single array). Rather, it does something more efficient: multiple processes simultaneously assemble their portions of the answer in non-overlapping portions of shared memory. Once done, no collation/appending is necessary: we just read out the result.

import os, numpy, multiprocessing, itertools

SHARED_VARS = {} # the best way to get multiprocessing.Pool to send shared multiprocessing.Array objects between processes is to attach them to something global - see http://stackoverflow.com/questions/1675766/

def operate( slices ):
    # grok the inputs
    yslice, xslice = slices
    y, x, r = get_shared_arrays('y', 'x', 'r')
    # create views of the appropriate chunks/slices of the arrays:
    y = y[yslice]
    x = x[xslice]
    r = r[yslice, xslice]
    # do the actual business
    for i in range(len(r)):
        r[i] = y[i] * x  # If this is truly all operate() does, it can be parallelized far more efficiently by numpy itself.
                         # But let's assume this is a placeholder for something more complicated.

    return 'Process %d operated on y[%s] and x[%s] (%d x %d chunk)' % (os.getpid(), slicestr(yslice), slicestr(xslice), y.size, x.size)

def check(y, x, r):
    r2 = x[numpy.newaxis, :] * y[:, numpy.newaxis]  # obviously this check will only be valid if operate() literally does only multiplication (in which case this whole business is unncessary)
    print( 'max. abs. diff. = %g' % numpy.abs(r - r2).max() )
    return y, x, r

def slicestr(s):
    return ':'.join( '' if x is None else str(x) for x in [s.start, s.stop, s.step] )

def m2n(buf, shape, typecode, ismatrix=False):
    """
    Return a numpy.array VIEW of a multiprocessing.Array given a
    handle to the array, the shape, the data typecode, and a boolean
    flag indicating whether the result should be cast as a matrix.
    """
    a = numpy.frombuffer(buf, dtype=typecode).reshape(shape)
    if ismatrix: a = numpy.asmatrix(a)
    return a

def n2m(a):
    """
    Return a multiprocessing.Array COPY of a numpy.array, together
    with shape, typecode and matrix flag.
    """
    if not isinstance(a, numpy.ndarray): a = numpy.array(a)
    return multiprocessing.Array(a.dtype.char, a.flat, lock=False), tuple(a.shape), a.dtype.char, isinstance(a, numpy.matrix)

def new_shared_array(shape, typecode='d', ismatrix=False):
    """
    Allocate a new shared array and return all the details required
    to reinterpret it as a numpy array or matrix (same order of
    output arguments as n2m)
    """
    typecode = numpy.dtype(typecode).char
    return multiprocessing.Array(typecode, int(numpy.prod(shape)), lock=False), tuple(shape), typecode, ismatrix

def get_shared_arrays(*names):
    return [m2n(*SHARED_VARS[name]) for name in names]

def init(*pargs, **kwargs):
    SHARED_VARS.update(pargs, **kwargs)

if __name__ == '__main__':

    ylen = 1000
    xlen = 2000

    init( y=n2m(range(ylen)) )
    init( x=n2m(numpy.random.rand(xlen)) )
    init( r=new_shared_array([ylen, xlen], float) )

    print('Master process ID is %s' % os.getpid())

    #print( operate([slice(None), slice(None)]) ); check(*get_shared_arrays('y', 'x', 'r'))  # local test

    pool = multiprocessing.Pool(initializer=init, initargs=SHARED_VARS.items())
    yslices = [slice(0,333), slice(333,666), slice(666,None)]
    xslices = [slice(0,1000), slice(1000,None)]
    #xslices = [slice(None)]  # uncomment this if you only want to divide things up in the y dimension
    reports = pool.map(operate, itertools.product(yslices, xslices))
    print('\n'.join(reports))
    y, x, r = check(*get_shared_arrays('y', 'x', 'r'))
like image 51
jez Avatar answered Nov 17 '22 00:11

jez