When running sparkJob on a cluster past a certain data size(~2,5gb) I am getting either "Job cancelled because SparkContext was shut down" or "executor lost". When looking at yarn gui I see that job that got killed was successful. There are no problems when running on data that is 500mb. I was looking for a solution and found that: - "seems yarn kills some of the executors as they request more memory than expected."
Any suggestions how to debug it?
command that I submit my spark job with:
/opt/spark-1.5.0-bin-hadoop2.4/bin/spark-submit --driver-memory 22g --driver-cores 4 --num-executors 15 --executor-memory 6g --executor-cores 6 --class sparkTesting.Runner --master yarn-client myJar.jar jarArguments
and sparkContext settings
val sparkConf = (new SparkConf()
.set("spark.driver.maxResultSize", "21g")
.set("spark.akka.frameSize", "2011")
.set("spark.eventLog.enabled", "true")
.set("spark.eventLog.enabled", "true")
.set("spark.eventLog.dir", configVar.sparkLogDir)
)
Simplified code that fails looks like that
val hc = new org.apache.spark.sql.hive.HiveContext(sc)
val broadcastParser = sc.broadcast(new Parser())
val featuresRdd = hc.sql("select "+ configVar.columnName + " from " + configVar.Table +" ORDER BY RAND() LIMIT " + configVar.Articles)
val myRdd : org.apache.spark.rdd.RDD[String] = featuresRdd.map(doSomething(_,broadcastParser))
val allWords= featuresRdd
.flatMap(line => line.split(" "))
.count
val wordQuantiles= featuresRdd
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.map(pair => (pair._2 , pair._2))
.reduceByKey(_+_)
.sortBy(_._1)
.collect
.scanLeft((0,0.0)) ( (res,add) => (add._1, res._2+add._2) )
.map(entry => (entry._1,entry._2/allWords))
val dictionary = featuresRdd
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _) // here I have Rdd of word,count tuples
.filter(_._2 >= moreThan)
.filter(_._2 <= lessThan)
.filter(_._1.trim!=(""))
.map(_._1)
.zipWithIndex
.collect
.toMap
And Error stack
Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1511)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1435)
at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1715)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1714)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:146)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
at org.apache.spark.rdd.RDD.count(RDD.scala:1121)
at sparkTesting.InputGenerationAndDictionaryComputations$.createDictionary(InputGenerationAndDictionaryComputations.scala:50)
at sparkTesting.Runner$.main(Runner.scala:133)
at sparkTesting.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
SparkContext can shutdown for many different reasons including code errors. check your YARN resources, I think it does not have enough resources to run. Spark may have to wait for Zeppelin and other YARN apps to finish. Note: Zeppelin YARN app keeps running.
A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. Only one SparkContext should be active per JVM. You must stop() the active SparkContext before creating a new one.
You should stop() the active SparkContext before creating a new one. The Spark driver program creates and uses SparkContext to connect to the cluster manager to submit Spark jobs, and know what resource manager (YARN, Mesos or Standalone) to communicate to. It is the heart of the Spark application.
Found the answer.
The my table was saved as a 20gb avro file. When executors tried to open it. Each of them had to load 20gb into memory. Solved it by using csv instead of avro
Symptoms are typical of a OutOfMemory error in one the executor tasks. Try augmenting memory for executor when lauching job. See parameter --executor-memory of spark-submit, spark-shell etc. Default value is 1G
Another possible cause of the "SparkContext is shutdown" error is that you are importing a jar file after evaluating some other code. (This may only happen in Spark Notebook.)
To fix the problem, move all your :cp myjar.jar
statements to the start of your file.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With