I have a large sparse matrix X in scipy.sparse.csr_matrix format and I would like to multiply this by a numpy array W making use of parallelism. After some research I discovered I need to use Array in multiprocessing in order to avoid copying X and W between processes (from e.g. here: How to combine Pool.map with Array (shared memory) in Python multiprocessing? and Is shared readonly data copied to different processes for Python multiprocessing?). Here is my latest attempt
import multiprocessing
import numpy
import scipy.sparse
import time
def initProcess(data, indices, indptr, shape, Warr, Wshp):
global XData
global XIndices
global XIntptr
global Xshape
XData = data
XIndices = indices
XIntptr = indptr
Xshape = shape
global WArray
global WShape
WArray = Warr
WShape = Wshp
def dot2(args):
rowInds, i = args
global XData
global XIndices
global XIntptr
global Xshape
data = numpy.frombuffer(XData, dtype=numpy.float)
indices = numpy.frombuffer(XIndices, dtype=numpy.int32)
indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32)
Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape)
global WArray
global WShape
W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape)
return Xr[rowInds[i]:rowInds[i+1], :].dot(W)
def getMatmat(X):
numJobs = multiprocessing.cpu_count()
rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int)
#Store the data in X as RawArray objects so we can share it amoung processes
XData = multiprocessing.RawArray("d", X.data)
XIndices = multiprocessing.RawArray("i", X.indices)
XIndptr = multiprocessing.RawArray("i", X.indptr)
def matmat(W):
WArray = multiprocessing.RawArray("d", W.flatten())
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape))
params = []
for i in range(numJobs):
params.append((rowInds, i))
iterator = pool.map(dot2, params)
P = numpy.zeros((X.shape[0], W.shape[1]))
for i in range(numJobs):
P[rowInds[i]:rowInds[i+1], :] = iterator[i]
return P
return matmat
if __name__ == '__main__':
#Create a random sparse matrix X and a random dense one W
X = scipy.sparse.rand(10000, 8000, 0.1)
X = X.tocsr()
W = numpy.random.rand(8000, 20)
startTime = time.time()
A = getMatmat(X)(W)
parallelTime = time.time()-startTime
startTime = time.time()
B = X.dot(W)
nonParallelTime = time.time()-startTime
print(parallelTime, nonParallelTime)
However the output is something like: (4.431, 0.165) indicating the parallel version is much slower than non-parallel multiplication.
I believe slowdown can be caused in similar situations when one is copying large data to the processes, but this isn't the case here as I use Array to store the shared variables (unless it happens in numpy.frombuffer or when creating a csr_matrix, but then I could not find a way to share a csr_matrix directly). One other possible cause of the slow speed is returning a large result of each matrix multiplication for each process however I am not sure of a way around this.
Can someone see where I am going wrong? Thanks for any help!
Update: I can't be sure but I think sharing large amounts of data between processes is just not that efficient, and ideally I should be using multithreading (although the Global Interpreter Lock (GIL) makes that very hard). One way around this is to release the GIL using Cython for example (see http://docs.cython.org/src/userguide/parallelism.html), although a lot of the numpy functions need to go through the GIL.
Your best bet is to drop down to C with Cython. That way you can beat the GIL and use OpenMP. I'm not surprised that multiprocessing is slower -- there's a lot of overhead there.
Here's a naive wrapper OpenMP wrapper of CSparse's sparse matrix - vector product code in python.
On my laptop, this runs a little bit faster than scipy. But I don't have that many cores. The code, including the setup.py script and the C header files and stuff is in this gist: https://gist.github.com/rmcgibbo/6019670
I suspect that if you really want the parallel code to be fast (on my laptop, it's only about 20% faster than single-threaded scipy, even when using 4 threads), you need to think more carefully about where the parallelism happens than I did, paying attention to cache locality.
# psparse.pyx
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
cimport cython
cimport numpy as np
import numpy as np
import scipy.sparse
from libc.stddef cimport ptrdiff_t
from cython.parallel import parallel, prange
#-----------------------------------------------------------------------------
# Headers
#-----------------------------------------------------------------------------
ctypedef int csi
ctypedef struct cs:
# matrix in compressed-column or triplet form
csi nzmax # maximum number of entries
csi m # number of rows
csi n # number of columns
csi *p # column pointers (size n+1) or col indices (size nzmax)
csi *i # row indices, size nzmax
double *x # numerical values, size nzmax
csi nz # # of entries in triplet matrix, -1 for compressed-col
cdef extern csi cs_gaxpy (cs *A, double *x, double *y) nogil
cdef extern csi cs_print (cs *A, csi brief) nogil
assert sizeof(csi) == 4
#-----------------------------------------------------------------------------
# Functions
#-----------------------------------------------------------------------------
@cython.boundscheck(False)
def pmultiply(X not None, np.ndarray[ndim=2, mode='fortran', dtype=np.float64_t] W not None):
"""Multiply a sparse CSC matrix by a dense matrix
Parameters
----------
X : scipy.sparse.csc_matrix
A sparse matrix, of size N x M
W : np.ndarray[dtype=float564, ndim=2, mode='fortran']
A dense matrix, of size M x P. Note, W must be contiguous and in
fortran (column-major) order. You can ensure this using
numpy's `asfortranarray` function.
Returns
-------
A : np.ndarray[dtype=float64, ndim=2, mode='fortran']
A dense matrix, of size N x P, the result of multiplying X by W.
Notes
-----
This function is parallelized over the columns of W using OpenMP. You
can control the number of threads at runtime using the OMP_NUM_THREADS
environment variable. The internal sparse matrix code is from CSPARSE,
a Concise Sparse matrix package. Copyright (c) 2006, Timothy A. Davis.
http://www.cise.ufl.edu/research/sparse/CSparse, licensed under the
GNU LGPL v2.1+.
References
----------
.. [1] Davis, Timothy A., "Direct Methods for Sparse Linear Systems
(Fundamentals of Algorithms 2)," SIAM Press, 2006. ISBN: 0898716136
"""
if X.shape[1] != W.shape[0]:
raise ValueError('matrices are not aligned')
cdef int i
cdef cs csX
cdef np.ndarray[double, ndim=2, mode='fortran'] result
cdef np.ndarray[csi, ndim=1, mode = 'c'] indptr = X.indptr
cdef np.ndarray[csi, ndim=1, mode = 'c'] indices = X.indices
cdef np.ndarray[double, ndim=1, mode = 'c'] data = X.data
# Pack the scipy data into the CSparse struct. This is just copying some
# pointers.
csX.nzmax = X.data.shape[0]
csX.m = X.shape[0]
csX.n = X.shape[1]
csX.p = &indptr[0]
csX.i = &indices[0]
csX.x = &data[0]
csX.nz = -1 # to indicate CSC format
result = np.zeros((X.shape[0], W.shape[1]), order='F', dtype=np.double)
for i in prange(W.shape[1], nogil=True):
# X is in fortran format, so we can get quick access to each of its
# columns
cs_gaxpy(&csX, &W[0, i], &result[0, i])
return result
It calls some C from CSparse.
// src/cs_gaxpy.c
#include "cs.h"
/* y = A*x+y */
csi cs_gaxpy (const cs *A, const double *x, double *y)
{
csi p, j, n, *Ap, *Ai ;
double *Ax ;
if (!CS_CSC (A) || !x || !y) return (0) ; /* check inputs */
n = A->n ; Ap = A->p ; Ai = A->i ; Ax = A->x ;
for (j = 0 ; j < n ; j++)
{
for (p = Ap [j] ; p < Ap [j+1] ; p++)
{
y [Ai [p]] += Ax [p] * x [j] ;
}
}
return (1) ;
}
Perhaps a bit late with the response. It may be possible to get reliable parallel speedups by using the pyTrilinos package which provides python wrappers to many functions in Trilinos. Here is your example converted to use pyTrilinos:
from PyTrilinos import Epetra
from scipy.sparse import rand
import numpy as np
n_rows = 10000
n_cols = 8000
n_vecs = 20
fill_factor = 0.1
comm = Epetra.PyComm()
my_id = comm.MyPID()
row_map = Epetra.Map(n_rows, 0, comm)
out_vec_map = row_map
in_vec_map = Epetra.Map(n_cols, 0, comm)
col_map = Epetra.Map(n_cols, range(n_cols), 0, comm)
n_local_rows = row_map.NumMyElements()
# Create local block matrix in scipy and convert to Epetra
X = rand(n_local_rows, n_cols, fill_factor).tocoo()
A = Epetra.CrsMatrix(Epetra.Copy, row_map, col_map, int(fill_factor*n_cols*1.2), True)
A.InsertMyValues(X.row, X.col, X.data)
A.FillComplete()
# Create sub-vectors in numpy and convert to Epetra format
W = np.random.rand(in_vec_map.NumMyElements(), n_vecs)
V = Epetra.MultiVector(in_vec_map, n_vecs)
V[:] = W.T # order of indices is opposite
B = Epetra.MultiVector(out_vec_map, n_vecs)
# Multiply
A.Multiply(False, V, B)
You can then run this code using MPI
mpiexec -n 2 python scipy_to_trilinos.py
Other examples of PyTrilinos can be found on the github repository here. Of course if one were to use pyTrilinos, this way of initializing the matrix by using scipy may not be the most optimal.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With