I'm relatively new to spark and I'm trying to group data by multiple keys at the same time.
I have some data that I map so it ends up looking like this:
((K1,K2,K3),(V1,V2))
My goal is to group by (K1,K2,K3) and respectively sum V1 and V2 to end up with:
((K1,K2,K3), (SUM(V1),SUM(V2))
Here is the code I have so far:
val filepath = "file.avro"
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val data = sqlContext.read.avro(filepath)
val dataRDD = data.rdd
val mappedDataRDD = dataRDD.map{
case (v, w, x, y, z) => ((v,w,x), (y, z))
}.reduceByKey((x,y)=> ???)
So I'm looking for how to reduceByKey so I can group by the (v,w,x) keys and sum the y and z.
Spark reduceByKey Function In Spark, the reduceByKey function is a frequently used transformation operation that performs aggregation of data. It receives key-value pairs (K, V) as an input, aggregates the values based on the key and generates a dataset of (K, V) pairs as an output.
reduceByKey is not available on a single value rdd or regular rdd but pairRDD.
Both reduceByKey and groupByKey result in wide transformations which means both triggers a shuffle operation. The key difference between reduceByKey and groupByKey is that reduceByKey does a map side combine and groupByKey does not do a map side combine.
I think what you are looking for and should use is aggregateByKey
.
This method takes two parameter groups. The first parameter group takes the starting value of accumulator. The second parameter group takes two functions,
Now you can use it as follows,
val (accZeroY, accZeroZ): (Long, Long) = (0, 0)
val mappedDataRDD = dataRDD
.map({
case (v, w, x, y, z) => ((v,w,x), (y, z))
})
.aggregateByKey((accZeroY, accZeroZ))(
{ case ((accY, accZ), (y, z)) => (accY + y, accZ + z) }
{ case ((accY1, accZ1), (accY2, accZ2)) => (accY1 + accY2, accZ1 + accZ2) }
)
As you should have observed that both the functions in the second parameter group are actually same in this case. Which is possible only in the cases when the type of the needed accumulation
is same as the type of value in key-value-RDD
or PairRDD
.
In such cases you can also use reduceByKey
which you can think of as an aggregateByKey
with the same function passed as both function parameters,
val mappedDataRDD = dataRDD
.map({
case (v, w, x, y, z) => ((v,w,x), (y, z))
})
.reduceByKey(
{ case ((accY, accZ), (y, z)) => (accY + y, accZ + z) }
)
But in my opinion, you should NOT
use reduceBykey
. The reason that I suggested the use of aggregateByKey
is because accumulation of values on large datasets can sometime produce a result which is outside the range of your type.
For example in your case, I suspect that your (x, y)
is actually an (Int, Int)
and you want to accumulate it using (v, w, x)
as key. But whenever you are adding Int
in large amount... remember that the result can end up bigger than what an Int
can handle.
So... you will want the type of your accumulation to be something with bigger range that (Int, Int)
like (Long, Long)
and reduceByKey
does not allow you to do that. And so... I will say that perhaps you are looking for and should use aggregateByKey
You can also use reduceByKey
, you just have to be careful of what you want. I simplified the example, but it exposes what you want.
val rdd = sc.parallelize(List(
(1, 2, 1, 1, 1),
(1, 2, 1, 2, 2),
(1, 3, 2, 4, 4)))
rdd.map {
case (k1, k2, k3, v1, v2) => ((k1, k2, k3), (v1, v2))
}.reduceByKey {
// You receive two values which are actually tuples, so we treat them like that.
case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)
}.collect()
//res0: Array[((Int, Int), (Int, Int))] = Array(((1,2,1),(3,3)), ((1,3,2),(4,4)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With