All right, so I've asked a somewhat similar question related to how Spark handles exceptions internally, but the example I had back then wasn't really clear or complete. An answer there pointed me in some direction but I can't really explain some things.
I've setup a dummy spark streaming app and in the transform stage I have a russian-roulette expression, which might or not throw an exception. If an exception is thrown, I stop the spark streaming context. That's it, no other logic, no RDD
transformation.
object ImmortalStreamingJob extends App {
val conf = new SparkConf().setAppName("fun-spark").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
val elems = (1 to 1000).grouped(10)
.map(seq => ssc.sparkContext.parallelize(seq))
.toSeq
val stream = ssc.queueStream(mutable.Queue[RDD[Int]](elems: _*))
val transformed = stream.transform { rdd =>
try {
if (Random.nextInt(6) == 5) throw new RuntimeException("boom")
else println("lucky bastard")
rdd
} catch {
case e: Throwable =>
println("stopping streaming context", e)
ssc.stop(stopSparkContext = true, stopGracefully = false)
throw e
}
}
transformed.foreachRDD { rdd =>
println(rdd.collect().mkString(","))
}
ssc.start()
ssc.awaitTermination()
}
Running this in IntelliJ will throw the exception at some point. The fun part:
RDD
has been processed, the app hangs after printing the error message and never stops, which is not what I wantWhy does the app hang instead of dying in the second case?
I'm running Spark 2.1.0 on Scala 2.11.8. Getting out the try-catch solves the problem (Spark stops by itself). Also, moving out the try-catch inside foreachRDD
solves the problem.
However I'm looking for an answer that can help me understand what's going on in this particular example.
You will only see exceptions manifest themselves in actions (like foreachRDD
in this case) rather than transformations (like transform
in this case) because actions execute transformations lazily. This means your transformations won't even occur until the action. The reason why this is necessary demands changing your mental model of how distributed processing works.
Consider a conventional, single-threaded program. Code proceeds line-by-line, and if an exception is thrown and not handled, subsequent lines of code just don't execute. In a distributed system where the same Spark transformations are running in parallel on multiple machines (and at different paces), what should happen when an exception is thrown? It's not so simple since the exception on one machine is independent of the code proceeding on other machines, which is how you want it. To want all the independent tasks spread throughout a cluster to just shut down on an exception is simply single-machine thinking that doesn't translate to a distributed paradigm. How is the driver supposed to deal with that?
According to Matei Zaharia, now of Databricks and one of the creators of Spark back at Berkeley, "Exceptions should be sent back to the driver program and logged there (with a SparkException
thrown if a task fails more than 4 times)." (Incidentally, this default number of retries can be changed with spark.task.maxFailures
.). So if Log4J is properly configured on the executors, the exception will be logged there; then it will be serialized and sent back to the driver, who will try again 3 more times by default.
In your particular situation, I would guess you have a couple of things going on. First, you are running on a single machine, which will give a misleading picture of how exception handling works in a distributed model. Second, you are stopping the context prematurely. Stopping the context is an extremely destructive operation, which includes stopping all your listeners and the DAGScheduler
. Frankly, I don't know how you can expect Spark to wrap everything up so neatly when you've basically turned out the lights.
Finally, I would mention that a more elegant exception handling model might be executing your transformations inside a Try
. You will end up with potentially more cumbersome code in that your transformations will return RDD[Try[T]]
or DStream[Try[T]]
, which means you will have to handle the Success
and Failure
cases for each element. But you will be able to propagate success and error information downstream with all the benefits a monad provides including mapping RDD[Try[A]] => RDD[Try[B]]
and even using for
comprehensions (by virtue of flatMap
).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With