I am making a process pool and each of them need to write in different parts of a matrix that exists in the main program. There exists no fear of overwriting information as each process will work with different rows of the matrix. How can i make the matrix writable from within the processes??
The program is a matrix multiplier a professor assigned me and has to be multiprocessed. It will create a process for every core the computer has. The main program will send different parts of the matrix to the processes and they will compute them, then they will return them in a way i can identify which response corresponds to which row it was based on.
Have you tried using multiprocessing.Array class to establish some shared memory?
See also the example from the docs:
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print num.value
print arr[:]
Just extend this to a matrix of size h*w
with i*w+j
-style indexing. Then, add multiple processes using a Process Pool.
The cost of creating of new processes or copying matrices between them if processes are reused overshadows the cost of matrix multiplication. Anyway numpy.dot()
can utilize different CPU cores by itself.
Matrix multiplication can be distributed between processes by computing different rows of the result in different processes, e.g., given input matrices a
and b
then the result (i,j)
element is:
out[i,j] = sum(a[i,:] * b[:,j])
So i
-th row can be computed as:
import numpy as np
def dot_slice(a, b, out, i):
t = np.empty_like(a[i,:])
for j in xrange(b.shape[1]):
# out[i,j] = sum(a[i,:] * b[:,j])
np.multiply(a[i,:], b[:,j], t).sum(axis=1, out=out[i,j])
numpy
array accepts a slice as an index, e.g., a[1:3,:]
returns the 2nd and 3rd rows.
a
, b
are readonly so they can be inherited as is by child processes (exploiting copy-on-write on Linux), the result is computed using shared array. Only indexes are copied during computations:
import ctypes
import multiprocessing as mp
def dot(a, b, nprocesses=mp.cpu_count()):
"""Perform matrix multiplication using multiple processes."""
if (a.shape[1] != b.shape[0]):
raise ValueError("wrong shape")
# create shared array
mp_arr = mp.RawArray(ctypes.c_double, a.shape[0]*b.shape[1])
# start processes
np_args = mp_arr, (a.shape[0], b.shape[1]), a.dtype
pool = mp.Pool(nprocesses, initializer=init, initargs=(a, b)+np_args)
# perform multiplication
for i in pool.imap_unordered(mpdot_slice, slices(a.shape[0], nprocesses)):
print("done %s" % (i,))
pool.close()
pool.join()
# return result
return tonumpyarray(*np_args)
Where:
def mpdot_slice(i):
dot_slice(ga, gb, gout, i)
return i
def init(a, b, *np_args):
"""Called on each child process initialization."""
global ga, gb, gout
ga, gb = a, b
gout = tonumpyarray(*np_args)
def tonumpyarray(mp_arr, shape, dtype):
"""Convert shared multiprocessing array to numpy array.
no data copying
"""
return np.frombuffer(mp_arr, dtype=dtype).reshape(shape)
def slices(nitems, mslices):
"""Split nitems on mslices pieces.
>>> list(slices(10, 3))
[slice(0, 4, None), slice(4, 8, None), slice(8, 10, None)]
>>> list(slices(1, 3))
[slice(0, 1, None), slice(1, 1, None), slice(2, 1, None)]
"""
step = nitems // mslices + 1
for i in xrange(mslices):
yield slice(i*step, min(nitems, (i+1)*step))
To test it:
def test():
n = 100000
a = np.random.rand(50, n)
b = np.random.rand(n, 60)
assert np.allclose(np.dot(a,b), dot(a,b, nprocesses=2))
On Linux this multiprocessing version has the same performance as the solution that uses threads and releases GIL (in the C extension) during computations:
$ python -mtimeit -s'from test_cydot import a,b,out,np' 'np.dot(a,b,out)'
100 loops, best of 3: 9.05 msec per loop
$ python -mtimeit -s'from test_cydot import a,b,out,cydot' 'cydot.dot(a,b,out)'
10 loops, best of 3: 88.8 msec per loop
$ python -mtimeit -s'from test_cydot import a,b; import mpdot' 'mpdot.dot(a,b)'
done slice(49, 50, None)
..[snip]..
done slice(35, 42, None)
10 loops, best of 3: 82.3 msec per loop
Note: the test was changed to use np.float64
everywhere.
Matrix multiplication means each element of the resulting matrix is calculated separately. That seems like a job for Pool. Since it's homework (and also to follow the SO code) I will only illustrate the use of the Pool itself, not the whole solution.
So, you have to write a routine to calculate the (i, j)-th element of the resulting matrix:
def getProductElement(m1, m2, i, j):
# some calculations
return element
Then you initialize the Pool:
from multiprocessing import Pool, cpu_count
pool = Pool(processes=cpu_count())
Then you need to submit the jobs. You can organize them in a matrix, too, but why bother, let's just make a list.
result = []
# here you need to iterate through the the columns of the first and the rows of
# the second matrix. How you do it, depends on the implementation (how you store
# the matrices). Also, make sure you check the dimensions are the same.
# The simplest case is if you have a list of columns:
N = len(m1)
M = len(m2[0])
for i in range(N):
for j in range(M):
results.append(pool.apply_async(getProductElement, (m1, m2, i, j)))
Then fill the resulting matrix with the results:
m = []
count = 0
for i in range(N):
column = []
for j in range(M):
column.append(results[count].get())
m.append(column)
Again, the exact shape of the code depends on how you represent the matrices.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With