I've implemented a solution to group RDD[K, V]
by key and to compute data according to each group (K, RDD[V])
, using partitionBy
and Partitioner
. Nevertheless, I'm not sure if it is really efficient and I'd like to have your point of view.
Here is a sample case : according to a list of [K: Int, V: Int]
, compute the V
s mean for each group of K
, knowing that it should be distributed and that V
values may be very large. That should give :
List[K, V] => (K, mean(V))
The simple Partitioner class:
class MyPartitioner(maxKey: Int) extends Partitioner {
def numPartitions = maxKey
def getPartition(key: Any): Int = key match {
case i: Int if i < maxKey => i
}
}
The partition code :
val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7))
val rdd = sc.parallelize(l)
val p = rdd.partitionBy(new MyPartitioner(4)).cache()
p.foreachPartition(x => {
try {
val r = sc.parallelize(x.toList)
val id = r.first() //get the K partition id
val v = r.map(x => x._2)
println(id._1 + "->" + mean(v))
} catch {
case e: UnsupportedOperationException => 0
}
})
The output is :
1->13, 2->4, 3->7
My questions are :
partitionBy
? (sorry, I didn't find enough specs on it)paralellize(x.toList)
? Is it consistent to do it ? (I need a RDD
in input of mean()
)Regards
Your code should not work. You cannot pass the SparkContext
object to the executors. (It's not Serializable
.) Also I don't see why you would need to.
To calculate the mean, you need to calculate the sum and the count and take their ratio. The default partitioner will do fine.
def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = {
case class SumCount(sum: Double, count: Double)
val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
(sc, v) => SumCount(sc.sum + v, sc.count + 1.0),
(sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count))
sumCounts.map(sc => sc.sum / sc.count)
}
This is an efficient single-pass calculation that generalizes well.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With