Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to parallelise scipy sparse matrix multiplication

I have a large sparse matrix X in scipy.sparse.csr_matrix format and I would like to multiply this by a numpy array W making use of parallelism. After some research I discovered I need to use Array in multiprocessing in order to avoid copying X and W between processes (from e.g. here: How to combine Pool.map with Array (shared memory) in Python multiprocessing? and Is shared readonly data copied to different processes for Python multiprocessing?). Here is my latest attempt

import multiprocessing 
import numpy 
import scipy.sparse 
import time 

def initProcess(data, indices, indptr, shape, Warr, Wshp):
    global XData 
    global XIndices 
    global XIntptr 
    global Xshape 

    XData = data 
    XIndices = indices 
    XIntptr = indptr 
    Xshape = shape 

    global WArray
    global WShape 

    WArray = Warr     
    WShape = Wshp 

def dot2(args):
    rowInds, i = args     

    global XData 
    global XIndices
    global XIntptr 
    global Xshape 

    data = numpy.frombuffer(XData, dtype=numpy.float)
    indices = numpy.frombuffer(XIndices, dtype=numpy.int32)
    indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32)
    Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape)

    global WArray
    global WShape 
    W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape)

    return Xr[rowInds[i]:rowInds[i+1], :].dot(W)

def getMatmat(X): 
    numJobs = multiprocessing.cpu_count()
    rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int)

    #Store the data in X as RawArray objects so we can share it amoung processes
    XData = multiprocessing.RawArray("d", X.data)
    XIndices = multiprocessing.RawArray("i", X.indices)
    XIndptr = multiprocessing.RawArray("i", X.indptr)

    def matmat(W): 
        WArray = multiprocessing.RawArray("d", W.flatten())
        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape)) 
        params = [] 

        for i in range(numJobs): 
            params.append((rowInds, i))

        iterator = pool.map(dot2, params)
        P = numpy.zeros((X.shape[0], W.shape[1])) 

        for i in range(numJobs): 
            P[rowInds[i]:rowInds[i+1], :] = iterator[i]

        return P   

    return matmat 

if __name__ == '__main__':
    #Create a random sparse matrix X and a random dense one W     
    X = scipy.sparse.rand(10000, 8000, 0.1)
    X = X.tocsr()
    W = numpy.random.rand(8000, 20)

    startTime = time.time()
    A = getMatmat(X)(W)
    parallelTime = time.time()-startTime 

    startTime = time.time()
    B = X.dot(W)
    nonParallelTime = time.time()-startTime 

    print(parallelTime, nonParallelTime)

However the output is something like: (4.431, 0.165) indicating the parallel version is much slower than non-parallel multiplication.

I believe slowdown can be caused in similar situations when one is copying large data to the processes, but this isn't the case here as I use Array to store the shared variables (unless it happens in numpy.frombuffer or when creating a csr_matrix, but then I could not find a way to share a csr_matrix directly). One other possible cause of the slow speed is returning a large result of each matrix multiplication for each process however I am not sure of a way around this.

Can someone see where I am going wrong? Thanks for any help!

Update: I can't be sure but I think sharing large amounts of data between processes is just not that efficient, and ideally I should be using multithreading (although the Global Interpreter Lock (GIL) makes that very hard). One way around this is to release the GIL using Cython for example (see http://docs.cython.org/src/userguide/parallelism.html), although a lot of the numpy functions need to go through the GIL.

like image 262
Charanpal Avatar asked May 29 '13 12:05

Charanpal


2 Answers

Your best bet is to drop down to C with Cython. That way you can beat the GIL and use OpenMP. I'm not surprised that multiprocessing is slower -- there's a lot of overhead there.

Here's a naive wrapper OpenMP wrapper of CSparse's sparse matrix - vector product code in python.

On my laptop, this runs a little bit faster than scipy. But I don't have that many cores. The code, including the setup.py script and the C header files and stuff is in this gist: https://gist.github.com/rmcgibbo/6019670

I suspect that if you really want the parallel code to be fast (on my laptop, it's only about 20% faster than single-threaded scipy, even when using 4 threads), you need to think more carefully about where the parallelism happens than I did, paying attention to cache locality.

# psparse.pyx

#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
cimport cython
cimport numpy as np
import numpy as np
import scipy.sparse
from libc.stddef cimport ptrdiff_t
from cython.parallel import parallel, prange

#-----------------------------------------------------------------------------
# Headers
#-----------------------------------------------------------------------------

ctypedef int csi

ctypedef struct cs:
    # matrix in compressed-column or triplet form
    csi nzmax       # maximum number of entries
    csi m           # number of rows
    csi n           # number of columns
    csi *p          # column pointers (size n+1) or col indices (size nzmax)
    csi *i          # row indices, size nzmax
    double *x       # numerical values, size nzmax
    csi nz          # # of entries in triplet matrix, -1 for compressed-col

cdef extern csi cs_gaxpy (cs *A, double *x, double *y) nogil
cdef extern csi cs_print (cs *A, csi brief) nogil

assert sizeof(csi) == 4

#-----------------------------------------------------------------------------
# Functions
#-----------------------------------------------------------------------------

@cython.boundscheck(False)
def pmultiply(X not None, np.ndarray[ndim=2, mode='fortran', dtype=np.float64_t] W not None):
    """Multiply a sparse CSC matrix by a dense matrix

    Parameters
    ----------
    X : scipy.sparse.csc_matrix
        A sparse matrix, of size N x M
    W : np.ndarray[dtype=float564, ndim=2, mode='fortran']
        A dense matrix, of size M x P. Note, W must be contiguous and in
        fortran (column-major) order. You can ensure this using
        numpy's `asfortranarray` function.

    Returns
    -------
    A : np.ndarray[dtype=float64, ndim=2, mode='fortran']
        A dense matrix, of size N x P, the result of multiplying X by W.

    Notes
    -----
    This function is parallelized over the columns of W using OpenMP. You
    can control the number of threads at runtime using the OMP_NUM_THREADS
    environment variable. The internal sparse matrix code is from CSPARSE, 
    a Concise Sparse matrix package. Copyright (c) 2006, Timothy A. Davis.
    http://www.cise.ufl.edu/research/sparse/CSparse, licensed under the
    GNU LGPL v2.1+.

    References
    ----------
    .. [1] Davis, Timothy A., "Direct Methods for Sparse Linear Systems
    (Fundamentals of Algorithms 2)," SIAM Press, 2006. ISBN: 0898716136
    """
    if X.shape[1] != W.shape[0]:
        raise ValueError('matrices are not aligned')

    cdef int i
    cdef cs csX
    cdef np.ndarray[double, ndim=2, mode='fortran'] result
    cdef np.ndarray[csi, ndim=1, mode = 'c'] indptr  = X.indptr
    cdef np.ndarray[csi, ndim=1, mode = 'c'] indices = X.indices
    cdef np.ndarray[double, ndim=1, mode = 'c']    data = X.data

    # Pack the scipy data into the CSparse struct. This is just copying some
    # pointers.
    csX.nzmax = X.data.shape[0]
    csX.m = X.shape[0]
    csX.n = X.shape[1]
    csX.p = &indptr[0]
    csX.i = &indices[0]
    csX.x = &data[0]
    csX.nz = -1  # to indicate CSC format

    result = np.zeros((X.shape[0], W.shape[1]), order='F', dtype=np.double)
    for i in prange(W.shape[1], nogil=True):
        # X is in fortran format, so we can get quick access to each of its
        # columns
        cs_gaxpy(&csX, &W[0, i], &result[0, i])

    return result

It calls some C from CSparse.

// src/cs_gaxpy.c

#include "cs.h"
/* y = A*x+y */
csi cs_gaxpy (const cs *A, const double *x, double *y)
{
  csi p, j, n, *Ap, *Ai ;
  double *Ax ;
  if (!CS_CSC (A) || !x || !y) return (0) ;       /* check inputs */
  n = A->n ; Ap = A->p ; Ai = A->i ; Ax = A->x ;
  for (j = 0 ; j < n ; j++)
    {
      for (p = Ap [j] ; p < Ap [j+1] ; p++)
        {
      y [Ai [p]] += Ax [p] * x [j] ;
        }
    }
  return (1) ;
}
like image 175
Robert T. McGibbon Avatar answered Sep 20 '22 23:09

Robert T. McGibbon


Perhaps a bit late with the response. It may be possible to get reliable parallel speedups by using the pyTrilinos package which provides python wrappers to many functions in Trilinos. Here is your example converted to use pyTrilinos:

from PyTrilinos import Epetra
from scipy.sparse import rand
import numpy as np

n_rows = 10000
n_cols = 8000
n_vecs = 20
fill_factor = 0.1

comm = Epetra.PyComm()
my_id = comm.MyPID()

row_map = Epetra.Map(n_rows, 0, comm)
out_vec_map = row_map
in_vec_map = Epetra.Map(n_cols, 0, comm)
col_map = Epetra.Map(n_cols, range(n_cols), 0, comm)

n_local_rows = row_map.NumMyElements()

# Create local block matrix in scipy and convert to Epetra
X = rand(n_local_rows, n_cols, fill_factor).tocoo()

A = Epetra.CrsMatrix(Epetra.Copy, row_map, col_map, int(fill_factor*n_cols*1.2), True)
A.InsertMyValues(X.row, X.col, X.data)
A.FillComplete()

# Create sub-vectors in numpy and convert to Epetra format 
W = np.random.rand(in_vec_map.NumMyElements(), n_vecs)
V = Epetra.MultiVector(in_vec_map, n_vecs)

V[:] = W.T # order of indices is opposite

B = Epetra.MultiVector(out_vec_map, n_vecs)

# Multiply
A.Multiply(False, V, B)

You can then run this code using MPI

mpiexec -n 2 python scipy_to_trilinos.py

Other examples of PyTrilinos can be found on the github repository here. Of course if one were to use pyTrilinos, this way of initializing the matrix by using scipy may not be the most optimal.

like image 34
hakeem Avatar answered Sep 18 '22 23:09

hakeem