I always use reduceByKey
when I need to group data in RDDs, because it performs a map side reduce before shuffling data, which often means that less data gets shuffled around and I thus get better performance. Even when the map side reduce function collects all values and does not actually reduce the data amount, I still use reduceByKey
, because I'm assuming that the performance of reduceByKey
will never be worse than groupByKey
. However, I'm wondering if this assumption is correct or if there are indeed situations where groupByKey
should be preferred??
reduceByKey() works better with larger datasets when compared to groupByKey() . In reduceByKey() , pairs on the same machine with the same key are combined (by using the function passed into reduceByKey() ) before the data is shuffled.
Both reduceByKey and groupByKey result in wide transformations which means both triggers a shuffle operation. The key difference between reduceByKey and groupByKey is that reduceByKey does a map side combine and groupByKey does not do a map side combine.
If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using aggregateByKey or reduceByKey will provide much better performance. Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any key in memory.
Your comment on this answer: The groupByKey can cause out of disk problems as data is sent over the network and collected on the reduced workers. You can see the below example. Whereas in reducebykey, Data are combined at each partition, only one output for one key at each partition to send over the network.
I believe there are other aspects of the problem ignored by climbage and eliasah:
If operation doesn't reduce amount of data it has to be one way or another semantically equivalent to GroupByKey
. Lets assume we haveRDD[(Int,String)]
:
import scala.util.Random
Random.setSeed(1)
def randomString = Random.alphanumeric.take(Random.nextInt(10)).mkString("")
val rdd = sc.parallelize((1 to 20).map(_ => (Random.nextInt(5), randomString)))
and we want to concatenate all strings for a given key. With groupByKey
it is pretty simple:
rdd.groupByKey.mapValues(_.mkString(""))
Naive solution with reduceByKey
looks like this:
rdd.reduceByKey(_ + _)
It is short and arguably easy to understand but suffers from two issues:
String
object every time*To deal with the first problem we need a mutable data structure:
import scala.collection.mutable.StringBuilder
rdd.combineByKey[StringBuilder](
(s: String) => new StringBuilder(s),
(sb: StringBuilder, s: String) => sb ++= s,
(sb1: StringBuilder, sb2: StringBuilder) => sb1.append(sb2)
).mapValues(_.toString)
It still suggests something else that is really going on and is quite verbose especially if repeated multiple times in your script. You can of course extract anonymous functions
val createStringCombiner = (s: String) => new StringBuilder(s)
val mergeStringValue = (sb: StringBuilder, s: String) => sb ++= s
val mergeStringCombiners = (sb1: StringBuilder, sb2: StringBuilder) =>
sb1.append(sb2)
rdd.combineByKey(createStringCombiner, mergeStringValue, mergeStringCombiners)
but at the end of the day it still means additional effort to understand this code, increased complexity and no real added value. One thing I find particularly troubling is explicit inclusion of mutable data structures. Even if Spark handles almost all complexity it means we no longer have an elegant, referentially transparent code.
My point is if you really reduce amount of data by all means use reduceByKey
. Otherwise you make your code harder to write, harder to analyze and gain nothing in return.
Note:
This answer is focused on Scala RDD
API. Current Python implementation is quite different from its JVM counterpart and includes optimizations which provide significant advantage over naive reduceByKey
implementation in case of groupBy
-like operations.
For Dataset
API see DataFrame / Dataset groupBy behaviour/optimization.
* See Spark performance for Scala vs Python for a convincing example
reduceByKey
and groupByKey
both use combineByKey
with different combine/merge semantics.
They key difference I see is that groupByKey
passes the flag (mapSideCombine=false
) to the shuffle engine. Judging by the issue SPARK-772, this is a hint to the shuffle engine to not run the mapside combiner when the data size isn't going to change.
So I would say that if you are trying to use reduceByKey
to replicate groupByKey
, you might see a slight performance hit.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With