I know the accumulator variables are 'write only' from the point of view of tasks, when they are in execution in worker nodes. I was doing some testing on this and I realized that I am able to print the accumulator value in the task.
Here I am initializing the accumulator in the driver:-
scala> val accum = sc.accumulator(123)
accum: org.apache.spark.Accumulator[Int] = 123
Then I go on to define a function 'foo':-
scala> def foo(pair:(String,String)) = { println(accum); pair }
foo: (pair: (String, String))(String, String)
In this function I am simply printing the accumulator and then I return the same pair that was received.
Now I have an RDD called myrdd with the following type:-
scala> myrdd
res13: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[9] at map at <console>:21
And I am now calling the map transformation on this RDD:-
myrdd.map(foo).collect
The 'collect' action is being applied to force evaluation. So what actually happens here is that during this execution a zero (0) is printed for every line of the RDD. Since this RDD has 4 elements, it prints 0 4 times. Since the action 'collect' is there , it also prints all the elements in the end, but that's not really the focus here. So I have two questions:-
After some experimentation I found that if I change function definition to access the actual value property of the accumulator object (accum.value), and then trigger the RDD action as described already, it does indeed throw the exception:-
scala> def foo(pair:(String,String)) = { println(accum.value); pair }
The exception caused during the RDD evaluation:-
Can't read accumulator value in the task
So what I was doing earlier is trying to print the accumulator object itself. But the question still remains as to why it printed 0? Because at driver level if I issue the same command that I used in the function definition, I do indeed get the value 123:-
scala> println(accum)
123
I didn't have to say println(accum.value) for it to work. So why only, when I issue this command in the function which the task uses, does it print 0?
Each of these accumulator classes has several methods, among these, add() method call from tasks running on the cluster. Tasks can't read the values from the accumulator and only the driver program can read accumulators value using the value() method.
Worker tasks on a Spark cluster can add values to an Accumulator with the += operator, but only the driver program is allowed to access its value, using value . Updates from the workers get propagated automatically to the driver program.
Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums. This guide shows each of these features in each of Spark's supported languages.
An accumulator is created from an initial value v by calling SparkContext. accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value.
Why is it printing 0 as the value of the accumulator, when we had initiated it as 123 in the driver?
Because worker nodes will never see initial value. Only thing that is passed to workers is zero
, as defined in AccumulatorParam
. For Accumulator[Int]
it is simply 0. If you first update an accumulator you'll see updated local value:
val acc = sc.accumulator(123)
val rdd = sc.parallelize(List(1, 2, 3))
rdd.foreach(i => {acc += i; println(acc)})
It is even clearer when you use a single partition:
rdd.repartition(1).foreach(i => {acc += i; println(acc)}
Why was the exception not thrown (...)?
Because exception is thrown when you access value
method, and toString
is not using it at all. Instead it is using private value_
variable, the same one which is returned by value
if !deserialized
check passed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With