I have 20,000*515 numpy matrix,representing biological datas. I need to find the correlation coefficient of the biological data,meaning as a result I would have a 20,000*20,000 matrix with correlation value. Then I populate a numpy array with 1's and 0's if each correlation coefficient is greater than a threshold value.
I used numpy.corrcoef to find the correlation coefficient and it works well in my machine.
Then I wanted to put in the cluster(having 10 computers and node varying from 2 to 8). when I tried putting it in the cluster, each node generating (40)random numbers and getting those 40 random columns from the biological data resulting in 20,000*40 matrix, I ran into memory issue saying.
mpirun noticed that process rank # with PID # on node name exited on signal 9 (Killed).
Then I tried rewriting the program like getting each rows finding the correlation coefficient and if the value is greater than the threshold then store 1 in the matrix or else 0 rather than creating a correlation matrix. But it takes 1.30 hrs to run this program.I need to run it 100 times.
Can anyone please suggest a better way of doing this, like how to solve the memory issue by allocating jobs once each core has finished it's job. I am new to MPI. Below is my code.
If you need any more information please let me know. Thank you
import numpy
from mpi4py import MPI
import time
Size=MPI.COMM_WORLD.Get_size();
Rank=MPI.COMM_WORLD.Get_rank();
Name=MPI.Get_processor_name();
RandomNumbers={};
rndm_indx=numpy.random.choice(range(515),40,replace=False)
rndm_indx=numpy.sort(rndm_indx)
Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);
RandomNumbers[Rank]=rndm_indx;
CORR_CR=numpy.zeros((Data.shape[0],Data.shape[0]));
start=time.time();
for i in range(0,Data.shape[0]):
Data[i]=Data[i]-np.mean(Data[i]);
alpha1=1./np.linalg.norm(Data[i]);
for j in range(i,Data.shape[0]):
if(i==j):
CORR_CR[i][j]=1;
else:
Data[j]=Data[j]-np.mean(Data[j]);
alpha2=1./np.linalg.norm(Data[j]);
corr=np.inner(Data[i],Data[j])*(alpha1*alpha2);
corr=int(np.absolute(corrcoef)>=0.9)
CORR_CR[i][j]=CORR_CR[j][i]=corr
end=time.time();
CORR_CR=CORR_CR-numpy.eye(CORR_CR.shape[0]);
elapsed=(end-start)
print('Total Time',elapsed)
Execute the Python code contained in pyfile, which must be a filesystem path referring to either a Python file, a directory containing a __main__.py file, or a zipfile containing a __main__.py file. Search sys. path for the named module mod and execute its contents. Execute the Python code in the cmd string command.
MPI for Python provides Python bindings for the Message Passing Interface (MPI) standard, allowing Python applications to exploit multiple processors on workstations, clusters and supercomputers. This package builds on the MPI specification and provides an object oriented interface resembling the MPI-2 C++ bindings.
Automatic MPI datatype discovery for NumPy/GPU arrays and PEP-3118 buffers is supported, but limited to basic C types (all C/C99-native signed/unsigned integral types and single/double precision real/complex floating types) and availability of matching datatypes in the underlying MPI implementation.
Python supports MPI (Message Passing Interface) through mpi4py module. Python's standard “multiprocessing” module (http://docs.python.org/2/library/multiprocessing.html) may be considered as an alternative option.
The execution time of the program you posted is about 96s on my computer. Let's optimize a couple of things before exploring parallel computations.
Let's store the norms of the vectors to avoid computing it each time the norm is needed. Getting alpha1=1./numpy.linalg.norm(Data[i]);
out of the second loop is a good starting point. Since the vectors do not change during computations, their norms can be computed in advance:
alpha=numpy.zeros(Data.shape[0])
for i in range(0,Data.shape[0]):
Data[i]=Data[i]-numpy.mean(Data[i])
alpha[i]=1./numpy.linalg.norm(Data[i])
for i in range(0,Data.shape[0]):
for j in range(i,Data.shape[0]):
if(i==j):
CORR_CR[i][j]=1;
else:
corr=numpy.inner(Data[i],Data[j])*(alpha[i]*alpha[j]);
corr=int(numpy.absolute(corr)>=0.9)
CORR_CR[i][j]=CORR_CR[j][i]=corr
The computation time is already down to 17s.
Assuming that the vectors are not highly correlated, most of the correlation coefficients will be rounded to zero. Hence, the matrix is likely to be sparce (full of zeros). Let's use the scipy.sparse.coo_matrix
sparce matrix format, which is very easy to populate: the non-null items and their coordinates i,j are to be stored in lists.
data=[]
ii=[]
jj=[]
...
if(corr!=0):
data.append(corr)
ii.append(i)
jj.append(j)
data.append(corr)
ii.append(j)
jj.append(i)
...
CORR_CR=scipy.sparse.coo_matrix((data,(ii,jj)), shape=(Data.shape[0],Data.shape[0]))
The computation time is down to 13s (negligeable improvement ?) and the memory footprint is greatly reduced. It would be a major improvement if larger datasets were to be considered.
for
loops in python are pretty unefficient. See for loop in python is 10x slower than matlab for instance. But there are plenty of ways around, such as using vectorized operations or optimized iterators such as those provided by numpy.nditer
. One of the reasons for for
loops being unefficient is that python is a interpreted language: not compilation occurs in the process. Hence, to overcome this problem, the most tricky part of the code can be compiled by using a tool like Cython
.
The critical part of the code are written in Cython, in a dedicated file correlator.pyx
.
This file is turned into a correlator.c
file by Cython
This file is compiled by your favorite c compiler gcc
to build a shared library correlator.so
The optimized function can be used in your program after import correlator
.
The content of correlator.pyx
, starting from Numpy vs Cython speed , looks like:
import numpy
cimport numpy
cimport scipy.linalg.cython_blas
ctypedef numpy.float64_t DTYPE_t
cimport cython
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)
def process(numpy.ndarray[DTYPE_t, ndim=2] array,numpy.ndarray[DTYPE_t, ndim=1] alpha,int imin,int imax):
cdef unsigned int rows = array.shape[0]
cdef int cols = array.shape[1]
cdef unsigned int row, row2
cdef int one=1
ii=[]
jj=[]
data=[]
for row in range(imin,imax):
for row2 in range(row,rows):
if row==row2:
data.append(0)
ii.append(row)
jj.append(row2)
else:
corr=scipy.linalg.cython_blas.ddot(&cols,&array[row,0],&one,&array[row2,0],&one)*alpha[row]*alpha[row2]
corr=int(numpy.absolute(corr)>=0.9)
if(corr!=0):
data.append(corr)
ii.append(row)
jj.append(row2)
data.append(corr)
ii.append(row2)
jj.append(row)
return ii,jj,data
where scipy.linalg.cython_blas.ddot()
will perform the inner product.
To cythonize and compile the .pyx file, the following makefile can be used (I hope you are using Linux...)
all: correlator correlatorb
correlator: correlator.pyx
cython -a correlator.pyx
correlatorb: correlator.c
gcc -shared -pthread -fPIC -fwrapv -O2 -Wall -fno-strict-aliasing -I/usr/include/python2.7 -o correlator.so correlator.c
The new correlation function is called in the main python file by:
import correlator
ii,jj,data=correlator.process(Data,alpha,0,Data.shape[0])
By using a compiled loop, the time is down to 5.4s! It's already ten times faster. Moreover, this computations are performed on a single process!
Let's introduce parallel computations.
Please notice that two arguments are added to the function process
: imin
and imax
. These arguments signals the range of rows of CORR_CR
that are computed. It is performed so as to anticipate the use of parallel computation. Indeed, a straightforward way to parallelize the program is to split the outer for
loop (index i
) to the different processes.
Each processes will perform the outer for loop for a particular range of the index i
which is computed so as to balance the workload between the processes.
The program has to complete the following operations:
Data
in the file.Data
is broadcast to all processes by using the MPI function bcast()
.i
to be considered by each process is computed.data
,ii
,jj
on each process.gather()
. It produces three lists of Size
lists which are concatenated to get 3 lists required to create the sparce adjacency matrix.Here goes the code:
import numpy
from mpi4py import MPI
import time
import scipy.sparse
import warnings
warnings.simplefilter('ignore',scipy.sparse.SparseEfficiencyWarning)
Size=MPI.COMM_WORLD.Get_size();
Rank=MPI.COMM_WORLD.Get_rank();
Name=MPI.Get_processor_name();
#a dummy set of data is created here.
#Samples such that (i-j)%10==0 are highly correlated.
RandomNumbers={};
rndm_indx=numpy.random.choice(range(515),40,replace=False)
rndm_indx=numpy.sort(rndm_indx)
Data=numpy.ascontiguousarray(numpy.zeros((2000,515),dtype=numpy.float64))
if Rank==0:
#Data=numpy.genfromtxt('MyData.csv',usecols=rndm_indx);
Data=numpy.ascontiguousarray(numpy.random.rand(2000,515))
lin=numpy.linspace(0.,1.,515)
for i in range(Data.shape[0]):
Data[i]+=numpy.sin((1+i%10)*10*lin)*100
start=time.time();
#braodcasting the matrix
Data=MPI.COMM_WORLD.bcast(Data, root=0)
RandomNumbers[Rank]=rndm_indx;
print Data.shape[0]
#an array to store the inverse of the norm of each sample
alpha=numpy.zeros(Data.shape[0],dtype=numpy.float64)
for i in range(0,Data.shape[0]):
Data[i]=Data[i]-numpy.mean(Data[i])
if numpy.linalg.norm(Data[i])==0:
print "process "+str(Rank)+" is facing a big problem"
else:
alpha[i]=1./numpy.linalg.norm(Data[i])
#balancing the computational load between the processes.
#Each process must perform about Data.shape[0]*Data.shape[0]/(2*Size) correlation.
#each process cares for a set of rows.
#Of course, the last rank must care about more rows than the first one.
ilimits=numpy.zeros(Size+1,numpy.int32)
if Rank==0:
nbtaskperprocess=Data.shape[0]*Data.shape[0]/(2*Size)
icurr=0
for i in range(Size):
nbjob=0
while(nbjob<nbtaskperprocess and icurr<=Data.shape[0]):
nbjob+=(Data.shape[0]-icurr)
icurr+=1
ilimits[i+1]=icurr
ilimits[Size]=Data.shape[0]
ilimits=MPI.COMM_WORLD.bcast(ilimits, root=0)
#the "local" job has been cythonized in file main2.pyx
import correlator
ii,jj,data=correlator.process(Data,alpha,ilimits[Rank],ilimits[Rank+1])
#gathering the matrix inputs from every processes on the root process.
data = MPI.COMM_WORLD.gather(data, root=0)
ii = MPI.COMM_WORLD.gather(ii, root=0)
jj = MPI.COMM_WORLD.gather(jj, root=0)
if Rank==0:
#concatenate the lists
data2=sum(data,[])
ii2=sum(ii,[])
jj2=sum(jj,[])
#create the adjency matrix
CORR_CR=scipy.sparse.coo_matrix((data2,(ii2,jj2)), shape=(Data.shape[0],Data.shape[0]))
print CORR_CR
end=time.time();
elapsed=(end-start)
print('Total Time',elapsed)
By running mpirun -np 4 main.py
, the computation time is 3.4s. It's not 4 time faster... This is likely due to the fact that the bottleneck of the computation is the computations of scalar products, which requires a large memory bandwidth. And my personnal computer is really limited regarding the memory bandwidth...
Last comment: there are plenty of possibilities for improvements.
- The vectors in Data
are copied to every processes... This affects the memory required by the program. Dispatching the computation in a different way and trading memory against communications could overcome this problem...
- Each process still computes the norms of all the vectors...It could be improved by having each process computing the norms of some vector and using the MPI function allreduce()
on alpha
...
What to do with this adjacency matrix ?
I think you already know the answer to this question, but you can provide this adjacency matrix to sparse.csgraph
routines such as connected_components()
or laplacian()
. Indeed, you are not very far from spectral clustering!
For instance, if the clusters are obvious, using connected_components()
is sufficient:
if Rank==0:
# coo to csr format
S=CORR_CR.tocsr()
print S
n_components, labels=scipy.sparse.csgraph.connected_components(S, directed=False, connection='weak', return_labels=True)
print "number of different famillies "+str(n_components)
numpy.set_printoptions(threshold=numpy.nan)
print labels
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With