I try to read data from a static textfile stored in HDFS, store its content into an ArrayBuffer which in turn should be broadcasted via sparkContext.broadcast as a BroadcastVariable. I am using cloudera's spark, spark version 1.6.0-cdh5.7.0 and spark-streaming_2.10.
I start the application on yarn using spark-submit:
spark-submit --class my.package.BroadcastStreamTest1 --master yarn --deploy-mode client --conf spark.executor.userClassPathFirst=true current.jar
When I do this, I get an java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator The same code used with a hard coded ArrayBuffer work perfectly so I assume it has something to do with the static file resource... Does anyone have an idea what I am possibly doing wrong? Any help appreciated.
object BroadcastStreamTest1 { def main(args: Array[String]) { val sparkConf = new SparkConf() val streamingContext = new StreamingContext(sparkConf, batchDuration = Seconds(10)) val content = streamingContext.sparkContext .textFile("hdfs:///data/someTextFile.txt") .collect() .toBuffer[String] val broadCastVar = streamingContext.sparkContext.broadcast(content) broadCastVar.value.foreach(line => println(line)) streamingContext.start() streamingContext.awaitTermination() } }
object BroadcastStreamTest2 { def main(args: Array[String]) { val sparkConf = new SparkConf() val streamingContext = new StreamingContext(sparkConf, batchDuration = Seconds(10)) val content = new mutable.ArrayBuffer[String] (1 to 50).foreach(i => content += "line" + i) val broadCastVar = streamingContext.sparkContext.broadcast(content) broadCastVar.value.foreach(line => println(line)) streamingContext.start() streamingContext.awaitTermination() } }
16/04/25 10:09:59 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, n525.hadoop.mxint.net): java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1208) at org.apache.spark.Accumulable.readObject(Accumulators.scala:151) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) at org.apache.spark.Accumulable$$anonfun$readObject$1.apply$mcV$sp(Accumulators.scala:152) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205) ... 30 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1940) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.collect(RDD.scala:926) at net.meetrics.dada.streaming.application.BroadcastStreamTest1$.main(BroadcastStreamTest1.scala:14) at net.meetrics.dada.streaming.application.BroadcastStreamTest1.main(BroadcastStreamTest1.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1208) at org.apache.spark.Accumulable.readObject(Accumulators.scala:151) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: cannot assign instance of scala.Some to field org.apache.spark.Accumulable.name of type scala.Option in instance of org.apache.spark.Accumulator at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) at org.apache.spark.Accumulable$$anonfun$readObject$1.apply$mcV$sp(Accumulators.scala:152) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1205) ... 30 more
The variable "power" is declared as BroadCast Variable by using the broadcast() function. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task.
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.
Every time you need to call update function to get new broadcast variable.
Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums. This guide shows each of these features in each of Spark's supported languages.
The reason was some kind of conflict with my provided jar file.
Without setting
spark.executor.userClassPathFirst=true
it works, unfortunately I could not locate the exact cause of the problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With