Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use ReduceByKey on multiple key in a Scala Spark Job

I'm relatively new to spark and I'm trying to group data by multiple keys at the same time.

I have some data that I map so it ends up looking like this:

((K1,K2,K3),(V1,V2))

My goal is to group by (K1,K2,K3) and respectively sum V1 and V2 to end up with:

((K1,K2,K3), (SUM(V1),SUM(V2))

Here is the code I have so far:

val filepath  = "file.avro"
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)            
val data = sqlContext.read.avro(filepath)
val dataRDD = data.rdd

val mappedDataRDD = dataRDD.map{
   case (v, w, x, y, z) => ((v,w,x), (y, z))
}.reduceByKey((x,y)=> ???)

So I'm looking for how to reduceByKey so I can group by the (v,w,x) keys and sum the y and z.

like image 264
Vincent Avatar asked Sep 27 '16 16:09

Vincent


People also ask

How do you use reduceByKey in Spark?

Spark reduceByKey Function In Spark, the reduceByKey function is a frequently used transformation operation that performs aggregation of data. It receives key-value pairs (K, V) as an input, aggregates the values based on the key and generates a dataset of (K, V) pairs as an output.

Can we use reduceByKey in Spark DataFrame?

reduceByKey is not available on a single value rdd or regular rdd but pairRDD.

What is the difference between groupByKey and reduceByKey in Spark?

Both reduceByKey and groupByKey result in wide transformations which means both triggers a shuffle operation. The key difference between reduceByKey and groupByKey is that reduceByKey does a map side combine and groupByKey does not do a map side combine.


2 Answers

I think what you are looking for and should use is aggregateByKey.

This method takes two parameter groups. The first parameter group takes the starting value of accumulator. The second parameter group takes two functions,

  1. A function which accumulates things into accumulator.
  2. A functions which combines two accumulators.

Now you can use it as follows,

val (accZeroY, accZeroZ): (Long, Long) = (0, 0) 

val mappedDataRDD = dataRDD
  .map({
    case (v, w, x, y, z) => ((v,w,x), (y, z))
  })
  .aggregateByKey((accZeroY, accZeroZ))(
    { case ((accY, accZ), (y, z)) =>  (accY + y, accZ + z) }
    { case ((accY1, accZ1), (accY2, accZ2)) => (accY1 + accY2, accZ1 + accZ2) }
  )

As you should have observed that both the functions in the second parameter group are actually same in this case. Which is possible only in the cases when the type of the needed accumulation is same as the type of value in key-value-RDD or PairRDD.

In such cases you can also use reduceByKey which you can think of as an aggregateByKey with the same function passed as both function parameters,

val mappedDataRDD = dataRDD
  .map({
    case (v, w, x, y, z) => ((v,w,x), (y, z))
  })
  .reduceByKey(
    { case ((accY, accZ), (y, z)) =>  (accY + y, accZ + z) }
  )

But in my opinion, you should NOT use reduceBykey. The reason that I suggested the use of aggregateByKey is because accumulation of values on large datasets can sometime produce a result which is outside the range of your type.

For example in your case, I suspect that your (x, y) is actually an (Int, Int) and you want to accumulate it using (v, w, x) as key. But whenever you are adding Int in large amount... remember that the result can end up bigger than what an Int can handle.

So... you will want the type of your accumulation to be something with bigger range that (Int, Int) like (Long, Long) and reduceByKey does not allow you to do that. And so... I will say that perhaps you are looking for and should use aggregateByKey

like image 96
sarveshseri Avatar answered Oct 18 '22 23:10

sarveshseri


You can also use reduceByKey, you just have to be careful of what you want. I simplified the example, but it exposes what you want.

val rdd = sc.parallelize(List(
  (1, 2, 1, 1, 1), 
  (1, 2, 1, 2, 2),   
  (1, 3, 2, 4, 4)))

rdd.map {
  case (k1, k2, k3, v1, v2) => ((k1, k2, k3), (v1, v2))
}.reduceByKey {
  // You receive two values which are actually tuples, so we treat them like that.
  case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)
}.collect()
//res0: Array[((Int, Int), (Int, Int))] = Array(((1,2,1),(3,3)), ((1,3,2),(4,4)))
like image 39
Alberto Bonsanto Avatar answered Oct 19 '22 00:10

Alberto Bonsanto