Iam trying understand how certain things work in Spark. In the example as shown in http://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka
Says that code will sum the values within the RDD and store it in counter, which is not the case here because it doesn't work. Only if you remove paralelize this will work.
Can someone explain to me how this works? Or is the example wrong?
Thanks
val data = Array(1,2,3,4,5)
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
Nick the example and it's explanation provided above is absolutely correct, let me explain you in a deep ->
Let us suppose we are working on a single node with single worker node and executor and we have used foreach over a RDD to count number of elements in RDD. As we know we are on a single node and hence data will not be distributed and will remain a single identity and therefore the count variable(Closure -> These kinds of variable are known as Closure) will count for every element and this updation will be sent to the executor every time whenever an increment occur and then executor will submit the closure to driver node.
Drivernode -> Both executor and driver will reside on a single node and hence the count variable of driver node will be in a scope of executor node and therefore will update the driver node count variable value.
And we have been provided the resultant count value from driver node, not from the executor node.
Executor -> closure -> data
Now suppose we are working in a clustered environment, suppose 2 node and 2 workers and executors. Now the data will be split into several parts and hence ->
Data -> Data_1, Data_2
Drivernode -> on different node have it's count variable but not visible to the Executor 1 and Executor 2 because they reside on different nodes and hence executor1 and executor2 can't update the count variable at driver node
Executor1-> processing(Data_1) with closure_1
Executor2-> processing(Data_1) with closure_2
Closure 1 will update the executor 1 because it's serializable to executor 1 and similarly closure 2 will update executor 2
And to tackle such situation we use Accumulator like this:
val counter=sc.accumulator(0)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With