Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark job is failed due to java.io.NotSerializableException: org.apache.spark.SparkContext

I am facing above exception when I am trying to apply a method(ComputeDwt) on RDD[(Int,ArrayBuffer[(Int,Double)])] input. I am even using extends Serialization option to serialize objects in spark.Here is the code snippet.

input:series:RDD[(Int,ArrayBuffer[(Int,Double)])] 
DWTsample extends Serialization is a class having computeDwt function.
sc: sparkContext

val  kk:RDD[(Int,List[Double])]=series.map(t=>(t._1,new DWTsample().computeDwt(sc,t._2)))

Error:
org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext
org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
at org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)

Could anyone suggest me what could be the problem and what should be done to overcome this issue?

like image 280
yh18190 Avatar asked May 12 '14 09:05

yh18190


People also ask

What is SparkContext in Apache Spark?

A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one.


1 Answers

The line

series.map(t=>(t._1,new DWTsample().computeDwt(sc,t._2)))

references the SparkContext (sc) but SparkContext isn't serializable. SparkContext is designed to expose operations that are run on the driver; it can't be referenced/used by code that's run on workers.

You'll have to re-structure your code so that sc isn't referenced in your map function closure.

like image 109
Josh Rosen Avatar answered Oct 13 '22 00:10

Josh Rosen