I have a list of Tuples of type : (user id, name, count).
For example,
val x = sc.parallelize(List( ("a", "b", 1), ("a", "b", 1), ("c", "b", 1), ("a", "d", 1)) )
I'm attempting to reduce this collection to a type where each element name is counted.
So in above val x is converted to :
(a,ArrayBuffer((d,1), (b,2))) (c,ArrayBuffer((b,1)))
Here is the code I am currently using :
val byKey = x.map({case (id,uri,count) => (id,uri)->count}) val grouped = byKey.groupByKey val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))} val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey grouped2.foreach(println)
I'm attempting to use reduceByKey as it performs faster than groupByKey.
How can reduceByKey be implemented instead of above code to provide the same mapping ?
Spark reduceByKey Function In Spark, the reduceByKey function is a frequently used transformation operation that performs aggregation of data. It receives key-value pairs (K, V) as an input, aggregates the values based on the key and generates a dataset of (K, V) pairs as an output.
reduceByKey is not available on a single value rdd or regular rdd but pairRDD.
reduceByKey. Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
PySpark reduceByKey() transformation is used to merge the values of each key using an associative reduce function on PySpark RDD. It is a wider transformation as it shuffles data across multiple partitions and It operates on pair RDD (key/value pair).
Following your code:
val byKey = x.map({case (id,uri,count) => (id,uri)->count})
You could do:
val reducedByKey = byKey.reduceByKey(_ + _) scala> reducedByKey.collect.foreach(println) ((a,d),1) ((a,b),2) ((c,b),1)
PairRDDFunctions[K,V].reduceByKey
takes an associative reduce function that can be applied to the to type V of the RDD[(K,V)]. In other words, you need a function f[V](e1:V, e2:V) : V
. In this particular case with sum on Ints: (x:Int, y:Int) => x+y
or _ + _
in short underscore notation.
For the record: reduceByKey
performs better than groupByKey
because it attemps to apply the reduce function locally before the shuffle/reduce phase. groupByKey
will force a shuffle of all elements before grouping.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With