Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Accumulator fails on cluster, works locally

In the official spark documentation, there is an example for an accumulator which is used in a foreach call which is directly on an RDD:

scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10

I implemented my own accumulator:

val myCounter = sc.accumulator(0)

val myRDD = sc.textFile(inputpath) // :spark.RDD[String]

myRDD.flatMap(line => foo(line)) // line 69

def foo(line: String) = {
   myCounter += 1  // line 82 throwing NullPointerException
   // compute something on the input
}
println(myCounter.value)

In a local setting, this works just fine. However, if I run this job on a spark standalone cluster with several machines, the workers throw a

13/07/22 21:56:09 ERROR executor.Executor: Exception in task ID 247
java.lang.NullPointerException
    at MyClass$.foo(MyClass.scala:82)
    at MyClass$$anonfun$2.apply(MyClass.scala:67)
    at MyClass$$anonfun$2.apply(MyClass.scala:67)
    at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
    at spark.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:630)
    at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640)
    at spark.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:640)
    at spark.scheduler.ResultTask.run(ResultTask.scala:77)
    at spark.executor.Executor$TaskRunner.run(Executor.scala:98)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)

at the line which increments the accumulator myCounter.

My question is: Can accumulators only be used in "top-level" anonymous functions which are applied directly to RDDs and not in nested functions? If yes, why does my call succeed locally and fail on a cluster?

edit: increased verbosity of exception.

like image 751
ptikobj Avatar asked Jul 22 '13 18:07

ptikobj


People also ask

Does counter and accumulator perform the same function in spark?

Accumulators are like global variables in Spark application. In the real world, accumulators are used as counters and keep to keep track of something at an application level. Accumulators serve a very similar purpose as counters in MapReduce.

What is true about accumulator in spark?

Accumulators are variables that are only “added” to through an associative operation and can therefore, be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.

What is the maximum number of parameters that can be passed in the accumulator?

they say that - 255 parameters...

What is the difference between broadcast and accumulator in spark?

Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.


1 Answers

In my case too, accumulator was null in closure when I used 'extends App' to create a spark application as shown below

    object AccTest extends App {


    val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val accum = sc.accumulator(0, "My Accumulator")
    sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

    println("count:" + accum.value)

    sc.stop
  }
}

I replaced extends App with main() method and it worked in YARN cluster in HDP 2.4

object AccTest {

    def main(args: Array[String]): Unit = {

        val conf = new SparkConf().setAppName("AccTest").setMaster("yarn-client")
        val sc = new SparkContext(conf)
        sc.setLogLevel("ERROR")

        val accum = sc.accumulator(0, "My Accumulator")
        sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

        println("count:" + accum.value)

        sc.stop
    }
}

worked

like image 100
sreedhar Avatar answered Sep 20 '22 20:09

sreedhar