Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rolling your own reduceByKey in Spark Dataset

I'm trying to learn to use DataFrames and DataSets more in addition to RDDs. For an RDD, I know I can do someRDD.reduceByKey((x,y) => x + y), but I don't see that function for Dataset. So I decided to write one.

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {   val result = mutable.HashMap.empty[(Long,Long),Int]   val keys = mutable.HashSet.empty[(Long,Long)]   y.keys.foreach(z => keys += z)   x.keys.foreach(z => keys += z)   for (elem <- keys) {     val s1 = if(x.contains(elem)) x(elem) else 0     val s2 = if(y.contains(elem)) y(elem) else 0     result(elem) = s1 + s2   }   result }) 

However, this returns everything to the driver. How would you write this to return a Dataset? Maybe mapPartition and do it there?

Note this compiles but does not run because it doesn't have encoders for Map yet

like image 349
Carlos Bribiescas Avatar asked Jul 14 '16 19:07

Carlos Bribiescas


People also ask

Can we use reduceByKey in spark Dataframe?

reduceByKey is not available on a single value rdd or regular rdd but pairRDD.

How does reduceByKey work in spark?

Spark reduceByKey Function In Spark, the reduceByKey function is a frequently used transformation operation that performs aggregation of data. It receives key-value pairs (K, V) as an input, aggregates the values based on the key and generates a dataset of (K, V) pairs as an output.

What is the difference between groupByKey and reduceByKey explain by using the suitable coding example?

groupByKey can cause out of disk problems as data is sent over the network and collected on the reduce workers. Data is combined at each partition , only one output for one key at each partition to send over network. reduceByKey required combining all your values into another value with the exact same type.

Can we convert dataset to RDD?

Dataset is a strong typed Dataframe, so both Dataset and Dataframe could use . rdd to convert to a RDD.


2 Answers

I assume your goal is to translate this idiom to Datasets:

rdd.map(x => (x.someKey, x.someField))    .reduceByKey(_ + _)  // => returning an RDD of (KeyType, FieldType) 

Currently, the closest solution I have found with the Dataset API looks like this:

ds.map(x => (x.someKey, x.someField))          // [1]   .groupByKey(_._1)                               .reduceGroups((a, b) => (a._1, a._2 + b._2))   .map(_._2)                                   // [2]  // => returning a Dataset of (KeyType, FieldType)  // Comments: // [1] As far as I can see, having a map before groupByKey is required //     to end up with the proper type in reduceGroups. After all, we do //     not want to reduce over the original type, but the FieldType. // [2] required since reduceGroups converts back to Dataset[(K, V)] //     not knowing that our V's are already key-value pairs. 

Doesn't look very elegant and according to a quick benchmark it is also much less performant, so maybe we are missing something here...

Note: An alternative might be to use groupByKey(_.someKey) as a first step. The problem is that using groupByKey changes the type from a regular Dataset to a KeyValueGroupedDataset. The latter does not have a regular map function. Instead it offers an mapGroups, which does not seem very convenient because it wraps the values into an Iterator and performs a shuffle according to the docstring.

like image 64
bluenote10 Avatar answered Sep 21 '22 11:09

bluenote10


A more efficient solution uses mapPartitions before groupByKey to reduce the amount of shuffling (note this is not the exact same signature as reduceByKey but I think it is more flexible to pass a function than require the dataset consist of a tuple).

def reduceByKey[V: ClassTag, K](ds: Dataset[V], f: V => K, g: (V, V) => V)   (implicit encK: Encoder[K], encV: Encoder[V]): Dataset[(K, V)] = {   def h[V: ClassTag, K](f: V => K, g: (V, V) => V, iter: Iterator[V]): Iterator[V] = {     iter.toArray.groupBy(f).mapValues(_.reduce(g)).map(_._2).toIterator   }   ds.mapPartitions(h(f, g, _))     .groupByKey(f)(encK)     .reduceGroups(g) } 

Depending on the shape/size of your data, this is within 1 second of the performance of reduceByKey, and about 2x as fast as a groupByKey(_._1).reduceGroups. There is still room for improvements, so suggestions would be welcome.

like image 21
Justin Raymond Avatar answered Sep 19 '22 11:09

Justin Raymond