Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to print accumulator variable from within task (seem to "work" without calling value method)?

I know the accumulator variables are 'write only' from the point of view of tasks, when they are in execution in worker nodes. I was doing some testing on this and I realized that I am able to print the accumulator value in the task.

Here I am initializing the accumulator in the driver:-

scala> val accum  = sc.accumulator(123)
accum: org.apache.spark.Accumulator[Int] = 123

Then I go on to define a function 'foo':-

scala> def foo(pair:(String,String)) = { println(accum); pair }
foo: (pair: (String, String))(String, String)

In this function I am simply printing the accumulator and then I return the same pair that was received.

Now I have an RDD called myrdd with the following type:-

scala> myrdd
res13: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[9] at map at <console>:21

And I am now calling the map transformation on this RDD:-

myrdd.map(foo).collect

The 'collect' action is being applied to force evaluation. So what actually happens here is that during this execution a zero (0) is printed for every line of the RDD. Since this RDD has 4 elements, it prints 0 4 times. Since the action 'collect' is there , it also prints all the elements in the end, but that's not really the focus here. So I have two questions:-

  1. Logically, printing equivalent to reading, because only when you can read, can you print. So why is this allowed? Why was the exception not thrown something that would definitely happen if we try to 'return' the accumulator in the function)?
  2. Why is it printing 0 as the value of the accumulator, when we had initiated it as 123 in the driver?

After some experimentation I found that if I change function definition to access the actual value property of the accumulator object (accum.value), and then trigger the RDD action as described already, it does indeed throw the exception:-

scala> def foo(pair:(String,String)) = { println(accum.value); pair }

The exception caused during the RDD evaluation:-

Can't read accumulator value in the task

So what I was doing earlier is trying to print the accumulator object itself. But the question still remains as to why it printed 0? Because at driver level if I issue the same command that I used in the function definition, I do indeed get the value 123:-

scala> println(accum)
123

I didn't have to say println(accum.value) for it to work. So why only, when I issue this command in the function which the task uses, does it print 0?

like image 628
Dhiraj Avatar asked Jul 19 '15 01:07

Dhiraj


People also ask

Who can read accumulator value in task?

Each of these accumulator classes has several methods, among these, add() method call from tasks running on the cluster. Tasks can't read the values from the accumulator and only the driver program can read accumulators value using the value() method.

Can we have accumulator of += method?

Worker tasks on a Spark cluster can add values to an Accumulator with the += operator, but only the driver program is allowed to access its value, using value . Updates from the workers get propagated automatically to the driver program.

What is accumulator and broadcast variable in Spark?

Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums. This guide shows each of these features in each of Spark's supported languages.

How do you do an accumulator with Spark?

An accumulator is created from an initial value v by calling SparkContext. accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value.


1 Answers

Why is it printing 0 as the value of the accumulator, when we had initiated it as 123 in the driver?

Because worker nodes will never see initial value. Only thing that is passed to workers is zero, as defined in AccumulatorParam. For Accumulator[Int] it is simply 0. If you first update an accumulator you'll see updated local value:

val acc = sc.accumulator(123)
val rdd = sc.parallelize(List(1, 2, 3))
rdd.foreach(i => {acc += i; println(acc)})

It is even clearer when you use a single partition:

rdd.repartition(1).foreach(i => {acc += i; println(acc)}

Why was the exception not thrown (...)?

Because exception is thrown when you access value method, and toString is not using it at all. Instead it is using private value_ variable, the same one which is returned by value if !deserialized check passed.

like image 128
zero323 Avatar answered Oct 20 '22 14:10

zero323