Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using reduceByKey in Apache Spark (Scala)

Tags:

I have a list of Tuples of type : (user id, name, count).

For example,

val x = sc.parallelize(List(     ("a", "b", 1),     ("a", "b", 1),     ("c", "b", 1),     ("a", "d", 1)) ) 

I'm attempting to reduce this collection to a type where each element name is counted.

So in above val x is converted to :

(a,ArrayBuffer((d,1), (b,2))) (c,ArrayBuffer((b,1))) 

Here is the code I am currently using :

val byKey = x.map({case (id,uri,count) => (id,uri)->count})  val grouped = byKey.groupByKey val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))} val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey  grouped2.foreach(println) 

I'm attempting to use reduceByKey as it performs faster than groupByKey.

How can reduceByKey be implemented instead of above code to provide the same mapping ?

like image 270
blue-sky Avatar asked Jun 05 '14 22:06

blue-sky


People also ask

How do you use reduceByKey in Spark?

Spark reduceByKey Function In Spark, the reduceByKey function is a frequently used transformation operation that performs aggregation of data. It receives key-value pairs (K, V) as an input, aggregates the values based on the key and generates a dataset of (K, V) pairs as an output.

Can we use reduceByKey in Spark Dataframe?

reduceByKey is not available on a single value rdd or regular rdd but pairRDD.

How do I use reduceByKey in RDD?

reduceByKey. Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.

What does the reduceByKey function do?

PySpark reduceByKey() transformation is used to merge the values of each key using an associative reduce function on PySpark RDD. It is a wider transformation as it shuffles data across multiple partitions and It operates on pair RDD (key/value pair).


1 Answers

Following your code:

val byKey = x.map({case (id,uri,count) => (id,uri)->count}) 

You could do:

val reducedByKey = byKey.reduceByKey(_ + _)  scala> reducedByKey.collect.foreach(println) ((a,d),1) ((a,b),2) ((c,b),1) 

PairRDDFunctions[K,V].reduceByKey takes an associative reduce function that can be applied to the to type V of the RDD[(K,V)]. In other words, you need a function f[V](e1:V, e2:V) : V . In this particular case with sum on Ints: (x:Int, y:Int) => x+y or _ + _ in short underscore notation.

For the record: reduceByKey performs better than groupByKey because it attemps to apply the reduce function locally before the shuffle/reduce phase. groupByKey will force a shuffle of all elements before grouping.

like image 54
maasg Avatar answered Oct 09 '22 19:10

maasg