I'm trying to learn to use DataFrames and DataSets more in addition to RDDs. For an RDD, I know I can do someRDD.reduceByKey((x,y) => x + y)
, but I don't see that function for Dataset. So I decided to write one.
someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => { val result = mutable.HashMap.empty[(Long,Long),Int] val keys = mutable.HashSet.empty[(Long,Long)] y.keys.foreach(z => keys += z) x.keys.foreach(z => keys += z) for (elem <- keys) { val s1 = if(x.contains(elem)) x(elem) else 0 val s2 = if(y.contains(elem)) y(elem) else 0 result(elem) = s1 + s2 } result })
However, this returns everything to the driver. How would you write this to return a Dataset
? Maybe mapPartition and do it there?
Note this compiles but does not run because it doesn't have encoders for Map
yet
reduceByKey is not available on a single value rdd or regular rdd but pairRDD.
Spark reduceByKey Function In Spark, the reduceByKey function is a frequently used transformation operation that performs aggregation of data. It receives key-value pairs (K, V) as an input, aggregates the values based on the key and generates a dataset of (K, V) pairs as an output.
groupByKey can cause out of disk problems as data is sent over the network and collected on the reduce workers. Data is combined at each partition , only one output for one key at each partition to send over network. reduceByKey required combining all your values into another value with the exact same type.
Dataset is a strong typed Dataframe, so both Dataset and Dataframe could use . rdd to convert to a RDD.
I assume your goal is to translate this idiom to Datasets:
rdd.map(x => (x.someKey, x.someField)) .reduceByKey(_ + _) // => returning an RDD of (KeyType, FieldType)
Currently, the closest solution I have found with the Dataset API looks like this:
ds.map(x => (x.someKey, x.someField)) // [1] .groupByKey(_._1) .reduceGroups((a, b) => (a._1, a._2 + b._2)) .map(_._2) // [2] // => returning a Dataset of (KeyType, FieldType) // Comments: // [1] As far as I can see, having a map before groupByKey is required // to end up with the proper type in reduceGroups. After all, we do // not want to reduce over the original type, but the FieldType. // [2] required since reduceGroups converts back to Dataset[(K, V)] // not knowing that our V's are already key-value pairs.
Doesn't look very elegant and according to a quick benchmark it is also much less performant, so maybe we are missing something here...
Note: An alternative might be to use groupByKey(_.someKey)
as a first step. The problem is that using groupByKey
changes the type from a regular Dataset
to a KeyValueGroupedDataset
. The latter does not have a regular map
function. Instead it offers an mapGroups
, which does not seem very convenient because it wraps the values into an Iterator
and performs a shuffle according to the docstring.
A more efficient solution uses mapPartitions
before groupByKey
to reduce the amount of shuffling (note this is not the exact same signature as reduceByKey
but I think it is more flexible to pass a function than require the dataset consist of a tuple).
def reduceByKey[V: ClassTag, K](ds: Dataset[V], f: V => K, g: (V, V) => V) (implicit encK: Encoder[K], encV: Encoder[V]): Dataset[(K, V)] = { def h[V: ClassTag, K](f: V => K, g: (V, V) => V, iter: Iterator[V]): Iterator[V] = { iter.toArray.groupBy(f).mapValues(_.reduce(g)).map(_._2).toIterator } ds.mapPartitions(h(f, g, _)) .groupByKey(f)(encK) .reduceGroups(g) }
Depending on the shape/size of your data, this is within 1 second of the performance of reduceByKey
, and about 2x
as fast as a groupByKey(_._1).reduceGroups
. There is still room for improvements, so suggestions would be welcome.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With