Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Matrix Transpose on RowMatrix in Spark

Tags:

apache-spark

Suppose I have a RowMatrix.

  1. How can I transpose it. The API documentation does not seem to have a transpose method.
  2. The Matrix has the transpose() method. But it is not distributed. If I have a large matrix greater that the memory how can I transpose it?
  3. I have converted a RowMatrix to DenseMatrix as follows

    DenseMatrix Mat = new DenseMatrix(m,n,MatArr);
    

    which requires converting the RowMatrix to JavaRDD and converting JavaRDD to an array.

Is there any other convenient way to do the conversion?

Thanks in advance

like image 216
Chandan Avatar asked May 31 '15 10:05

Chandan


2 Answers

Getting the transpose of RowMatrix in Java:

public static RowMatrix transposeRM(JavaSparkContext jsc, RowMatrix mat){
List<Vector> newList=new ArrayList<Vector>();
List<Vector> vs = mat.rows().toJavaRDD().collect();
double [][] tmp=new double[(int)mat.numCols()][(int)mat.numRows()] ;

for(int i=0; i < vs.size(); i++){
    double[] rr=vs.get(i).toArray();
    for(int j=0; j < mat.numCols(); j++){
        tmp[j][i]=rr[j];
    }
}

for(int i=0; i < mat.numCols();i++)
    newList.add(Vectors.dense(tmp[i]));

JavaRDD<Vector> rows2 = jsc.parallelize(newList);
RowMatrix newmat = new RowMatrix(rows2.rdd());
return (newmat);
}
like image 109
Abdelmonem Mahmoud Amer Avatar answered Sep 23 '22 19:09

Abdelmonem Mahmoud Amer


For very large and sparse matrix, (like the one you get from text feature extraction), the best and easiest way is:

def transposeRowMatrix(m: RowMatrix): RowMatrix = {
  val indexedRM = new IndexedRowMatrix(m.rows.zipWithIndex.map({
    case (row, idx) => new IndexedRow(idx, row)}))
  val transposed = indexedRM.toCoordinateMatrix().transpose.toIndexedRowMatrix()
  new RowMatrix(transposed.rows
    .map(idxRow => (idxRow.index, idxRow.vector))
    .sortByKey().map(_._2))      
}

For not so sparse matrix, you can use BlockMatrix as the bridge as mentioned by aletapool's answer above.

However aletapool's answer misses a very important point: When you start from RowMaxtrix -> IndexedRowMatrix -> BlockMatrix -> transpose -> BlockMatrix -> IndexedRowMatrix -> RowMatrix, in the last step (IndexedRowMatrix -> RowMatrix), you have to do a sort. Because by default, converting from IndexedRowMatrix to RowMatrix, the index is simply dropped and the order will be messed up.

val data = Array(
  MllibVectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  MllibVectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  MllibVectors.dense(4.0, 0.0, 0.0, 6.0, 7.0),
  MllibVectors.sparse(5, Seq((2, 2.0), (3, 7.0))))

val dataRDD = sc.parallelize(data, 4)

val testMat: RowMatrix = new RowMatrix(dataRDD)
testMat.rows.collect().map(_.toDense).foreach(println)

[0.0,1.0,0.0,7.0,0.0]
[2.0,0.0,3.0,4.0,5.0]
[4.0,0.0,0.0,6.0,7.0]
[0.0,0.0,2.0,7.0,0.0]

transposeRowMatrix(testMat).
  rows.collect().map(_.toDense).foreach(println)

[0.0,2.0,4.0,0.0]
[1.0,0.0,0.0,0.0]
[0.0,3.0,0.0,2.0]
[7.0,4.0,6.0,7.0]
[0.0,5.0,7.0,0.0]
like image 25
Y.G. Avatar answered Sep 24 '22 19:09

Y.G.