I have a function defined which renders a MxN array. The array is very huge hence I want to use the function to produce small arrays (M1xN, M2xN, M3xN --- MixN. M1+M2+M3+---+Mi = M) simultaneously using multi-processing/threading and eventually join these arrays to form mxn array. As Mr. Boardrider rightfully suggested to provide a viable example, following example would broadly convey what I intend to do
import numpy as n
def mult(y,x):
r = n.empty([len(y),len(x)])
for i in range(len(r)):
r[i] = y[i]*x
return r
x = n.random.rand(10000)
y = n.arange(0,100000,1)
test = mult(y=y,x=x)
As the lengths of x
and y
increase the system will take more and more time. With respect to this example, I want to run this code such that if I have 4 cores, I can give quarter of the job to each, i.e give job to compute elements r[0]
to r[24999]
to the 1st core, r[25000]
to r[49999]
to the 2nd core, r[50000]
to r[74999]
to the 3rd core and r[75000]
to r[99999]
to the 4th core. Eventually club the results, append them to get one single array r[0]
to r[99999]
.
I hope this example makes things clear. If my problem is still not clear, please tell.
The first thing to say is: if it's about multiple cores on the same processor, numpy
is already capable of parallelizing the operation better than we could ever do by hand (see the discussion at multiplication of large arrays in python )
In this case the key would be simply to ensure that the multiplication is all done in a wholesale array operation rather than a Python for
-loop:
test2 = x[n.newaxis, :] * y[:, n.newaxis]
n.abs( test - test2 ).max() # verify equivalence to mult(): output should be 0.0, or very small reflecting floating-point precision limitations
[If you actually wanted to spread this across multiple separate CPUs, that's a different matter, but the question seems to suggest a single (multi-core) CPU.]
OK, bearing the above in mind: let's suppose you want to parallelize an operation more complicated than just mult()
. Let's assume you've tried hard to optimize your operation into wholesale array operations that numpy
can parallelize itself, but your operation just isn't susceptible to this. In that case, you can use a shared-memory multiprocessing.Array
created with lock=False
, and multiprocessing.Pool
to assign processes to address non-overlapping chunks of it, divided up over the y
dimension (and also simultaneously over x
if you want). An example listing is provided below. Note that this approach does not explicitly do exactly what you specify (club the results together and append them into a single array). Rather, it does something more efficient: multiple processes simultaneously assemble their portions of the answer in non-overlapping portions of shared memory. Once done, no collation/appending is necessary: we just read out the result.
import os, numpy, multiprocessing, itertools
SHARED_VARS = {} # the best way to get multiprocessing.Pool to send shared multiprocessing.Array objects between processes is to attach them to something global - see http://stackoverflow.com/questions/1675766/
def operate( slices ):
# grok the inputs
yslice, xslice = slices
y, x, r = get_shared_arrays('y', 'x', 'r')
# create views of the appropriate chunks/slices of the arrays:
y = y[yslice]
x = x[xslice]
r = r[yslice, xslice]
# do the actual business
for i in range(len(r)):
r[i] = y[i] * x # If this is truly all operate() does, it can be parallelized far more efficiently by numpy itself.
# But let's assume this is a placeholder for something more complicated.
return 'Process %d operated on y[%s] and x[%s] (%d x %d chunk)' % (os.getpid(), slicestr(yslice), slicestr(xslice), y.size, x.size)
def check(y, x, r):
r2 = x[numpy.newaxis, :] * y[:, numpy.newaxis] # obviously this check will only be valid if operate() literally does only multiplication (in which case this whole business is unncessary)
print( 'max. abs. diff. = %g' % numpy.abs(r - r2).max() )
return y, x, r
def slicestr(s):
return ':'.join( '' if x is None else str(x) for x in [s.start, s.stop, s.step] )
def m2n(buf, shape, typecode, ismatrix=False):
"""
Return a numpy.array VIEW of a multiprocessing.Array given a
handle to the array, the shape, the data typecode, and a boolean
flag indicating whether the result should be cast as a matrix.
"""
a = numpy.frombuffer(buf, dtype=typecode).reshape(shape)
if ismatrix: a = numpy.asmatrix(a)
return a
def n2m(a):
"""
Return a multiprocessing.Array COPY of a numpy.array, together
with shape, typecode and matrix flag.
"""
if not isinstance(a, numpy.ndarray): a = numpy.array(a)
return multiprocessing.Array(a.dtype.char, a.flat, lock=False), tuple(a.shape), a.dtype.char, isinstance(a, numpy.matrix)
def new_shared_array(shape, typecode='d', ismatrix=False):
"""
Allocate a new shared array and return all the details required
to reinterpret it as a numpy array or matrix (same order of
output arguments as n2m)
"""
typecode = numpy.dtype(typecode).char
return multiprocessing.Array(typecode, int(numpy.prod(shape)), lock=False), tuple(shape), typecode, ismatrix
def get_shared_arrays(*names):
return [m2n(*SHARED_VARS[name]) for name in names]
def init(*pargs, **kwargs):
SHARED_VARS.update(pargs, **kwargs)
if __name__ == '__main__':
ylen = 1000
xlen = 2000
init( y=n2m(range(ylen)) )
init( x=n2m(numpy.random.rand(xlen)) )
init( r=new_shared_array([ylen, xlen], float) )
print('Master process ID is %s' % os.getpid())
#print( operate([slice(None), slice(None)]) ); check(*get_shared_arrays('y', 'x', 'r')) # local test
pool = multiprocessing.Pool(initializer=init, initargs=SHARED_VARS.items())
yslices = [slice(0,333), slice(333,666), slice(666,None)]
xslices = [slice(0,1000), slice(1000,None)]
#xslices = [slice(None)] # uncomment this if you only want to divide things up in the y dimension
reports = pool.map(operate, itertools.product(yslices, xslices))
print('\n'.join(reports))
y, x, r = check(*get_shared_arrays('y', 'x', 'r'))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With