Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using PartitionBy to split and efficiently compute RDD groups by Key

I've implemented a solution to group RDD[K, V] by key and to compute data according to each group (K, RDD[V]), using partitionBy and Partitioner. Nevertheless, I'm not sure if it is really efficient and I'd like to have your point of view.

Here is a sample case : according to a list of [K: Int, V: Int], compute the Vs mean for each group of K, knowing that it should be distributed and that V values may be very large. That should give :

List[K, V] => (K, mean(V))

The simple Partitioner class:

class MyPartitioner(maxKey: Int) extends Partitioner {

    def numPartitions = maxKey

    def getPartition(key: Any): Int = key match {
      case i: Int if i < maxKey => i
    }
  }

The partition code :

val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7))

      val rdd = sc.parallelize(l)
      val p =  rdd.partitionBy(new MyPartitioner(4)).cache()

      p.foreachPartition(x => {
        try {
          val r = sc.parallelize(x.toList)
          val id = r.first() //get the K partition id
          val v = r.map(x => x._2)
          println(id._1 + "->" + mean(v))
        } catch {
          case e: UnsupportedOperationException => 0
        }
      })

The output is :

1->13, 2->4, 3->7

My questions are :

  1. what does it really happen when calling partitionBy ? (sorry, I didn't find enough specs on it)
  2. Is it really efficient to map by partition, knowing that in my production case it would not be too much keys (as 50 for sample) by very much values (as 1 million for sample)
  3. What is the cost of paralellize(x.toList) ? Is it consistent to do it ? (I need a RDD in input of mean())
  4. How would you do it by yourself ?

Regards

like image 458
Seb Avatar asked Feb 09 '15 14:02

Seb


1 Answers

Your code should not work. You cannot pass the SparkContext object to the executors. (It's not Serializable.) Also I don't see why you would need to.

To calculate the mean, you need to calculate the sum and the count and take their ratio. The default partitioner will do fine.

def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = {
  case class SumCount(sum: Double, count: Double)
  val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
    (sc, v) => SumCount(sc.sum + v, sc.count + 1.0),
    (sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count))
  sumCounts.map(sc => sc.sum / sc.count)
}

This is an efficient single-pass calculation that generalizes well.

like image 89
Daniel Darabos Avatar answered Oct 01 '22 06:10

Daniel Darabos