Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Understanding closures and parallelism in Spark

Iam trying understand how certain things work in Spark. In the example as shown in http://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka

Says that code will sum the values within the RDD and store it in counter, which is not the case here because it doesn't work. Only if you remove paralelize this will work.

Can someone explain to me how this works? Or is the example wrong?

Thanks

val data = Array(1,2,3,4,5)
var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
like image 716
Nick Avatar asked Oct 05 '15 09:10

Nick


1 Answers

Nick the example and it's explanation provided above is absolutely correct, let me explain you in a deep ->

Let us suppose we are working on a single node with single worker node and executor and we have used foreach over a RDD to count number of elements in RDD. As we know we are on a single node and hence data will not be distributed and will remain a single identity and therefore the count variable(Closure -> These kinds of variable are known as Closure) will count for every element and this updation will be sent to the executor every time whenever an increment occur and then executor will submit the closure to driver node.

Drivernode -> Both executor and driver will reside on a single node and hence the count variable of driver node will be in a scope of executor node and therefore will update the driver node count variable value.

And we have been provided the resultant count value from driver node, not from the executor node.

Executor -> closure -> data

Now suppose we are working in a clustered environment, suppose 2 node and 2 workers and executors. Now the data will be split into several parts and hence ->

Data -> Data_1, Data_2

Drivernode -> on different node have it's count variable but not visible to the Executor 1 and Executor 2 because they reside on different nodes and hence executor1 and executor2 can't update the count variable at driver node

Executor1-> processing(Data_1) with closure_1
Executor2-> processing(Data_1) with closure_2

Closure 1 will update the executor 1 because it's serializable to executor 1 and similarly closure 2 will update executor 2

And to tackle such situation we use Accumulator like this:

val counter=sc.accumulator(0)
like image 96
Kshitij Kulshrestha Avatar answered Oct 19 '22 15:10

Kshitij Kulshrestha