Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

accumulator of Spark is confusing me.

I am practicing Apache Spark but encountering the following problem.

val accum = sc.accumulator( 0, "My Accumulator.")
println (accum)  // print out: 0

sc.parallelize( Array(1, 2, 3, 4, 5) ).foreach( x => accum += x ) 
// sc.parallelize( Array(1, 2, 3, 4, 5) ).foreach( x => accum = accum + x )
println( accum.value ) // print out: 15

This line of code sc.parallelize( Array(1, 2, 3, 4, 5) ).foreach( x => accum += x ) is working quite well, but the code commented out below it is not working. The difference is:

x => accum += x

and

x => accum = accum + x

Why the second one is not working?

like image 434
fluency03 Avatar asked Dec 04 '15 17:12

fluency03


People also ask

How do you do an accumulator with spark?

An accumulator is created from an initial value v by calling SparkContext. accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value.

Are spark accumulators read only?

Accumulators are read-only shared variables provided by Spark. Accumulators are only "added" to through an associative and commutative operation and can be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums.

How do you use an accumulator in PySpark Dataframe?

The Accumulator in PySpark programming can be created using the "accumulator()" function from the SparkContext class and also Accumulators can be created for custom types using the AccumulatorParam class of the PySpark. The "sparkContext. accumulator()" is used to define the accumulator variables.

What are accumulators and when are accumulators truly reliable?

To answer the question "When are accumulators truly reliable ?" Answer : When they are present in an Action operation. As per the documentation in Action Task, even if any restarted tasks are present it will update Accumulator only once.


2 Answers

There are three reasons why it doesn't work:

  1. accum is a value so it cannot be reassigned
  2. Accumulable class, which is a base class for Accumulator provides only += method, not +
  3. accumulators are write-only from the worker perspective so you cannot read the value inside an action. Theoretically + method could modify accum in place, but it would be rather confusing.
like image 149
zero323 Avatar answered Nov 15 '22 05:11

zero323


Because believing it or not, an Accumulator in Apache Spark works like a write-only global variable, in our imperative thinking we don't see any difference between x += 1 and x = x + 1, but there is a slight difference, the second operation in Apache Spark would require to read the value, but the first one wouldn't, or in an easier (how zero said in his explanation) the method + isn't implemented for that class. Apache Spark on p. 41, you can read about how it works, the slides are extracted from the Introduction to Big Data with Apache Spark

like image 23
Alberto Bonsanto Avatar answered Nov 15 '22 05:11

Alberto Bonsanto