Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to compute the distance matrix in spark?

I have tried pairing the samples but it costs huge amount of memory as 100 samples leads to 9900 samples which is more costly. What could be the more effective way of computing distance matrix in distributed environment in spark

Here is a snippet of pseudo code what i'm trying

val input = (sc.textFile("AirPassengers.csv",(numPartitions/2)))
val i = input.map(s => (Vectors.dense(s.split(',').map(_.toDouble))))
val indexed = i.zipWithIndex()                                                                       //Including the index of each sample
val indexedData = indexed.map{case (k,v) => (v,k)}

val pairedSamples = indexedData.cartesian(indexedData)

val filteredSamples = pairedSamples.filter{ case (x,y) =>
(x._1.toInt > y._1.toInt)  //to consider only the upper or lower trainagle
 }
filteredSamples.cache
filteredSamples.count

Above code creates the pairs but even if my dataset contains 100 samples, by pairing filteredSamples (above) results in 4950 sample which could be very costly for big data

like image 692
Manoj Kondapaka Avatar asked Jun 14 '16 10:06

Manoj Kondapaka


1 Answers

I recently answered a similar question.

Basically, it will arrive to computing n(n-1)/2 pairs, which would be 4950 computations in your example. However, what makes this approach different is that I use joins instead of cartesian. With your code, the solution would look like this:

val input = (sc.textFile("AirPassengers.csv",(numPartitions/2)))
val i = input.map(s => (Vectors.dense(s.split(',').map(_.toDouble))))
val indexed = i.zipWithIndex()

// including the index of each sample
val indexedData = indexed.map { case (k,v) => (v,k) } 

// prepare indices
val count = i.count
val indices = sc.parallelize(for(i <- 0L until count; j <- 0L until count; if i > j) yield (i, j))

val joined1 = indices.join(indexedData).map { case (i, (j, v)) => (j, (i,v)) }
val joined2 = joined1.join(indexedData).map { case (j, ((i,v1),v2)) => ((i,j),(v1,v2)) }

// after that, you can then compute the distance using your distFunc
val distRDD = joined2.mapValues{ case (v1, v2) => distFunc(v1, v2) }

Try this method and compare it with the one you already posted. Hopefully, this can speedup your code a bit.

like image 127
jtitusj Avatar answered Nov 15 '22 16:11

jtitusj