The variables I share via broadcast are null in the cluster.
My application is quite complex, but I have written this small example that works flawlessly when I run it locally, but it fails in the cluster:
package com.gonzalopezzi.bigdata.bicing
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
object PruebaBroadcast2 extends App {
val conf = new SparkConf().setAppName("PruebaBroadcast2")
val sc = new SparkContext(conf)
val arr : Array[Int] = (6 to 9).toArray
val broadcasted = sc.broadcast(arr)
val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2) // a small integer array [1, 2, 3, 4] is paralellized in two machines
rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println) // NullPointerException in the flatmap. broadcasted is null
}
I don't know if the problem is a coding error or a configuration issue.
This is the stacktrace I get:
15/07/07 20:55:13 INFO scheduler.DAGScheduler: Job 0 failed: collect at PruebaBroadcast2.scala:24, took 0.992297 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, ip-172-31-36-49.ec2.internal): java.lang.NullPointerException
at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun$2.apply(PruebaBroadcast2.scala:24)
at com.gonzalopezzi.bigdata.bicing.PruebaBroadcast2$$anonfun$2.apply(PruebaBroadcast2.scala:24)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:202)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:56)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Command exiting with ret '1'
Can anyone help me fix this? At least, can you tell me if you see something strange in the code? If you think the code is ok, please tell me, as it would mean that the problem is in the configuration of the cluster.
Thanks in advance.
Finally I got it working.
It doesn't work declaring the object like this:
object MyObject extends App {
But it works, if you declare an object with a main function:
object MyObject {
def main (args : Array[String]) {
/* ... */
}
}
So, the short example in the question works if I rewrite it this way:
object PruebaBroadcast2 {
def main (args: Array[String]) {
val conf = new SparkConf().setAppName("PruebaBroadcast2")
val sc = new SparkContext(conf)
val arr : Array[Int] = (6 to 9).toArray
val broadcasted = sc.broadcast(arr)
val rdd : RDD[Int] = sc.parallelize((1 to 4).toSeq, 2)
rdd.flatMap((a : Int) => List((a, broadcasted.value(0)))).reduceByKey(_+_).collect().foreach(println)
}
}
This problem seems related to this bug: https://issues.apache.org/jira/browse/SPARK-4170
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With