Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark : Average of values instead of sum in reduceByKey using Scala

When reduceByKey is called it sums all values with same key. Is there any way to calculate the average of values for each key ?

// I calculate the sum like this and don't know how to calculate the avg
reduceByKey((x,y)=>(x+y)).collect


Array(((Type1,1),4.0), ((Type1,1),9.2), ((Type1,2),8), ((Type1,2),4.5), ((Type1,3),3.5), 
((Type1,3),5.0), ((Type2,1),4.6), ((Type2,1),4), ((Type2,1),10), ((Type2,1),4.3))
like image 708
finman Avatar asked Oct 17 '16 13:10

finman


2 Answers

One way is to use mapValues and reduceByKey which is easier than aggregateByKey.

.mapValues(value => (value, 1)) // map entry with a count of 1
.reduceByKey {
  case ((sumL, countL), (sumR, countR)) => 
    (sumL + sumR, countL + countR)
}
.mapValues { 
  case (sum , count) => sum / count 
}
.collect

https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

like image 66
gabi Avatar answered Sep 30 '22 03:09

gabi


there's lots of ways... but a simple way is to just use a class that keeps track of your total and count and computes average at the end. something like this would work.

class AvgCollector(val tot: Double, val cnt: Int = 1) {
  def combine(that: AvgCollector) = new AvgCollector(tot + that.tot, cnt + that.cnt)
  def avg = tot / cnt 
}

val rdd2 = {
  rdd
  .map{ case (k,v) => (k, new AvgCollector(v)) }
  .reduceByKey(_ combine _)
  .map{ case (k,v) => (k, v.avg) }
}

... or you could use aggregateByKey with a tweak to the class

class AvgCollector(val tot: Double, val cnt: Int = 1) {
  def ++(v: Double) = new AvgCollector(tot + v, cnt + 1)
  def combine(that: AvgCollector) = new AvgCollector(tot + that.tot, cnt + that.cnt)
  def avg = if (cnt > 0) tot / cnt else 0.0
}

rdd2 = {
  rdd
  .aggregateByKey( new AvgCollector(0.0,0) )(_ ++ _, _ combine _ )
  .map{ case (k,v) => (k, v.avg) }
}
like image 40
kmh Avatar answered Sep 30 '22 02:09

kmh