In the official spark documentation, there is an example for an accumulator which is used in a foreach
call which is directly on an RDD:
scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
I implemented my own accumulator:
val myCounter = sc.accumulator(0)
val myRDD = sc.textFile(inputpath) // :spark.RDD[String]
myRDD.flatMap(line => foo(line)) // line 69
def foo(line: String) = {
myCounter += 1 // line 82 throwing NullPointerException
// compute something on the input
}
println(myCounter.value)
In a local setting, this works just fine. However, if I run this job on a spark standalone cluster with several machines, the workers throw a
13/07/22 21:56:09 ERROR executor.Executor: Exception in task ID 247
java.lang.NullPointerException
at MyClass$.foo(MyClass.scala:82)
at MyClass$$anonfun$2.apply(MyClass.scala:67)
at MyClass$$anonfun$2.apply(MyClass.scala:67)
at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
at spark.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:630)
at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640)
at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640)
at spark.scheduler.ResultTask.run(ResultTask.scala:77)
at spark.executor.Executor$TaskRunner.run(Executor.scala:98)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
at the line which increments the accumulator myCounter
.
My question is: Can accumulators only be used in "top-level" anonymous functions which are applied directly to RDDs and not in nested functions? If yes, why does my call succeed locally and fail on a cluster?
edit: increased verbosity of exception.
Accumulators are like global variables in Spark application. In the real world, accumulators are used as counters and keep to keep track of something at an application level. Accumulators serve a very similar purpose as counters in MapReduce.
Accumulators are variables that are only “added” to through an associative operation and can therefore, be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.
they say that - 255 parameters...
Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.
In my case too, accumulator was null in closure when I used 'extends App' to create a spark application as shown below
object AccTest extends App {
val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val accum = sc.accumulator(0, "My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
println("count:" + accum.value)
sc.stop
}
}
I replaced extends App with main() method and it worked in YARN cluster in HDP 2.4
object AccTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val accum = sc.accumulator(0, "My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
println("count:" + accum.value)
sc.stop
}
}
worked
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With