Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I make processes able to write in an array of the main program?

I am making a process pool and each of them need to write in different parts of a matrix that exists in the main program. There exists no fear of overwriting information as each process will work with different rows of the matrix. How can i make the matrix writable from within the processes??

The program is a matrix multiplier a professor assigned me and has to be multiprocessed. It will create a process for every core the computer has. The main program will send different parts of the matrix to the processes and they will compute them, then they will return them in a way i can identify which response corresponds to which row it was based on.

like image 718
user1249212 Avatar asked Mar 16 '12 18:03

user1249212


3 Answers

Have you tried using multiprocessing.Array class to establish some shared memory?

See also the example from the docs:

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

Just extend this to a matrix of size h*w with i*w+j-style indexing. Then, add multiple processes using a Process Pool.

like image 170
moooeeeep Avatar answered Nov 13 '22 10:11

moooeeeep


The cost of creating of new processes or copying matrices between them if processes are reused overshadows the cost of matrix multiplication. Anyway numpy.dot() can utilize different CPU cores by itself.

Matrix multiplication can be distributed between processes by computing different rows of the result in different processes, e.g., given input matrices a and b then the result (i,j) element is:

out[i,j] = sum(a[i,:] * b[:,j])

So i-th row can be computed as:

import numpy as np

def dot_slice(a, b, out, i):
    t = np.empty_like(a[i,:])
    for j in xrange(b.shape[1]):
        # out[i,j] = sum(a[i,:] * b[:,j])
        np.multiply(a[i,:], b[:,j], t).sum(axis=1, out=out[i,j])

numpy array accepts a slice as an index, e.g., a[1:3,:] returns the 2nd and 3rd rows.

a, b are readonly so they can be inherited as is by child processes (exploiting copy-on-write on Linux), the result is computed using shared array. Only indexes are copied during computations:

import ctypes
import multiprocessing as mp

def dot(a, b, nprocesses=mp.cpu_count()):
    """Perform matrix multiplication using multiple processes."""
    if (a.shape[1] != b.shape[0]):
        raise ValueError("wrong shape")

    # create shared array
    mp_arr = mp.RawArray(ctypes.c_double, a.shape[0]*b.shape[1])

    # start processes
    np_args = mp_arr, (a.shape[0], b.shape[1]), a.dtype
    pool = mp.Pool(nprocesses, initializer=init, initargs=(a, b)+np_args)

    # perform multiplication
    for i in pool.imap_unordered(mpdot_slice, slices(a.shape[0], nprocesses)):
        print("done %s" % (i,))
    pool.close()
    pool.join()

    # return result
    return tonumpyarray(*np_args)

Where:

def mpdot_slice(i):
    dot_slice(ga, gb, gout, i)
    return i

def init(a, b, *np_args):
    """Called on each child process initialization."""
    global ga, gb, gout
    ga, gb = a, b
    gout = tonumpyarray(*np_args)

def tonumpyarray(mp_arr, shape, dtype):
    """Convert shared multiprocessing array to numpy array.

    no data copying
    """
    return np.frombuffer(mp_arr, dtype=dtype).reshape(shape)

def slices(nitems, mslices):
    """Split nitems on mslices pieces.

    >>> list(slices(10, 3))
    [slice(0, 4, None), slice(4, 8, None), slice(8, 10, None)]
    >>> list(slices(1, 3))
    [slice(0, 1, None), slice(1, 1, None), slice(2, 1, None)]
    """
    step = nitems // mslices + 1
    for i in xrange(mslices):
        yield slice(i*step, min(nitems, (i+1)*step))

To test it:

def test():
    n = 100000
    a = np.random.rand(50, n)
    b = np.random.rand(n, 60)
    assert np.allclose(np.dot(a,b), dot(a,b, nprocesses=2))

On Linux this multiprocessing version has the same performance as the solution that uses threads and releases GIL (in the C extension) during computations:

$ python -mtimeit -s'from test_cydot import a,b,out,np' 'np.dot(a,b,out)'
100 loops, best of 3: 9.05 msec per loop

$ python -mtimeit -s'from test_cydot import a,b,out,cydot' 'cydot.dot(a,b,out)' 
10 loops, best of 3: 88.8 msec per loop

$ python -mtimeit -s'from test_cydot import a,b; import mpdot' 'mpdot.dot(a,b)'
done slice(49, 50, None)
..[snip]..
done slice(35, 42, None)
10 loops, best of 3: 82.3 msec per loop

Note: the test was changed to use np.float64 everywhere.

like image 6
jfs Avatar answered Nov 13 '22 11:11

jfs


Matrix multiplication means each element of the resulting matrix is calculated separately. That seems like a job for Pool. Since it's homework (and also to follow the SO code) I will only illustrate the use of the Pool itself, not the whole solution.

So, you have to write a routine to calculate the (i, j)-th element of the resulting matrix:

def getProductElement(m1, m2, i, j):
    # some calculations
    return element

Then you initialize the Pool:

from multiprocessing import Pool, cpu_count
pool = Pool(processes=cpu_count())

Then you need to submit the jobs. You can organize them in a matrix, too, but why bother, let's just make a list.

result = []
# here you need to iterate through the the columns of the first and the rows of
# the second matrix. How you do it, depends on the implementation (how you store
# the matrices). Also, make sure you check the dimensions are the same.
# The simplest case is if you have a list of columns:

N = len(m1)
M = len(m2[0])
for i in range(N):
    for j in range(M):
        results.append(pool.apply_async(getProductElement, (m1, m2, i, j)))

Then fill the resulting matrix with the results:

m = []
count = 0
for i in range(N):
    column = []
    for j in range(M):
        column.append(results[count].get())
    m.append(column)

Again, the exact shape of the code depends on how you represent the matrices.

like image 2
Lev Levitsky Avatar answered Nov 13 '22 10:11

Lev Levitsky