Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark difference between reduceByKey vs. groupByKey vs. aggregateByKey vs. combineByKey

Can anyone explain the difference between reducebykey, groupbykey, aggregatebykey and combinebykey? I have read the documents regarding this, but couldn't understand the exact differences.

An explanation with examples would be great.

like image 460
Arun S Avatar asked Apr 12 '17 08:04

Arun S


1 Answers

groupByKey:

Syntax:

sparkContext.textFile("hdfs://")                     .flatMap(line => line.split(" ") )                     .map(word => (word,1))                     .groupByKey()                     .map((x,y) => (x,sum(y)))              

groupByKey can cause out of disk problems as data is sent over the network and collected on the reduced workers.

reduceByKey:

Syntax:

sparkContext.textFile("hdfs://")                     .flatMap(line => line.split(" "))                     .map(word => (word,1))                     .reduceByKey((x,y)=> (x+y)) 

Data are combined at each partition, with only one output for one key at each partition to send over the network. reduceByKey required combining all your values into another value with the exact same type.

aggregateByKey:

same as reduceByKey, which takes an initial value.

3 parameters as input

  1. initial value
  2. Combiner logic
  3. sequence op logic

Example:

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")     val data = sc.parallelize(keysWithValuesList)     //Create key value pairs     val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()     val initialCount = 0;     val addToCounts = (n: Int, v: String) => n + 1     val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2     val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts) 

ouput: Aggregate By Key sum Results bar -> 3 foo -> 5

combineByKey:

3 parameters as input

  1. Initial value: unlike aggregateByKey, need not pass constant always, we can pass a function that will return a new value.
  2. merging function
  3. combine function

Example:

val result = rdd.combineByKey(                         (v) => (v,1),                         ( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,                         ( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))                          ).map( { case (k,v) => (k,v._1/v._2.toDouble) })         result.collect.foreach(println) 

reduceByKey,aggregateByKey,combineByKey preferred over groupByKey

Reference: Avoid groupByKey

like image 84
Vamshavardhan Reddy Avatar answered Sep 20 '22 15:09

Vamshavardhan Reddy