Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why result of Spark reduceByKey is not consistent

I am trying to count the number of iteration of each line via spark using scala.
Following is my input:

1 vikram
2 sachin
3 shobit
4 alok
5 akul
5 akul
1 vikram
1 vikram
3 shobit
10 ashu
5 akul
1 vikram
2 sachin
7 vikram

now i create 2 separate RDDs as follows.

val f1 = sc.textFile("hdfs:///path to above data file")
val m1 = f1.map( s => (s.split(" ")(0),1) ) //creating a tuple (key,1)
//now if i create a RDD as
val rd1 = m1.reduceByKey((a,b) => a+b )
rd1.collect().foreach(println)
//I get a proper output i.e (it gives correct output every time)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1)

//but if i create a RDD as
val rd2 = m1.reduceByKey((a,b) => a+1 )
rd2.collect().foreach(println)
//I get a inconsistent result i.e some times i get this (WRONG)
//output: (4,1) (2,2) (7,1) (5,2) (3,2) (1,2) (10,1)
//and sometimes I get this as output (CORRECT)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1) 

I am unable yo understand why is this happening and where to use what. I have also tried creating RDD as

val m2 = f1.map(s => (s,1))
val rd3 = m2.reduceByKey((a,b) => a+1 )
// Then also same issue occurs with a+1 but every thing works fine with a+b
like image 945
Vikram Singh Chandel Avatar asked Dec 25 '22 01:12

Vikram Singh Chandel


1 Answers

reduceByKey assumes the passed function is commutative and associative (as docs clearly state). And - your first function (a, b) => a + b is, but (a, b) => a+1 isn't.

WHY? For one thing - reduceByKey applies the supplied function to each partition, and then to the combined results of all partitions. In other words, b isn't always 1, so using a+1 simply isn't correct.

Think of the following scenario - the input contains 4 records, split into two partitions:

(aa, 1)
(aa, 1)

(aa, 1)
(cc, 1)

reduceByKey(f) on this input might be calculated as follows:

val intermediate1 = f((aa, 1), (aa, 1)) 
val intermediate2 = f((aa, 1), (cc, 1))

val result = f(intermediate2, intermediate1)

Now, let's follow this with f = (a, b) => a + b

val intermediate1 = f((aa, 1), (aa, 1))       // (aa, 2)
val intermediate2 = f((aa, 1), (cc, 1))       // (aa, 1), (cc, 1)

val result = f(intermediate2, intermediate1)  // (aa, 3), (cc, 1)

And with f = (a, b) => a + 1:

val intermediate1 = f((aa, 1), (bb, 1))       // (aa, 2)
val intermediate2 = f((aa, 1), (cc, 1))       // (aa, 1), (cc, 1)

// this is where it goes wrong:
val result = f(intermediate2, intermediate1)  // (aa, 2), (cc, 1)

Main thing is - order of intermediate calculations isn't guaranteed and may change between executions, and for the latter case of non-commutative function this means result are sometimes wrong.

like image 91
Tzach Zohar Avatar answered Jan 05 '23 01:01

Tzach Zohar