Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

"Immortal" Spark Streaming Job?

All right, so I've asked a somewhat similar question related to how Spark handles exceptions internally, but the example I had back then wasn't really clear or complete. An answer there pointed me in some direction but I can't really explain some things.

I've setup a dummy spark streaming app and in the transform stage I have a russian-roulette expression, which might or not throw an exception. If an exception is thrown, I stop the spark streaming context. That's it, no other logic, no RDD transformation.

object ImmortalStreamingJob extends App {
  val conf = new SparkConf().setAppName("fun-spark").setMaster("local[*]")
  val ssc  = new StreamingContext(conf, Seconds(1))

  val elems = (1 to 1000).grouped(10)
    .map(seq => ssc.sparkContext.parallelize(seq))
    .toSeq
  val stream = ssc.queueStream(mutable.Queue[RDD[Int]](elems: _*))

  val transformed = stream.transform { rdd =>
    try {
      if (Random.nextInt(6) == 5) throw new RuntimeException("boom")
      else println("lucky bastard")
      rdd
    } catch {
      case e: Throwable =>
        println("stopping streaming context", e)
        ssc.stop(stopSparkContext = true, stopGracefully = false)
        throw e
    }
  }

  transformed.foreachRDD { rdd =>
    println(rdd.collect().mkString(","))
  }

  ssc.start()
  ssc.awaitTermination()
}

Running this in IntelliJ will throw the exception at some point. The fun part:

  • if the exception is thrown in the first transformation (when the first RDD is processed), the spark context is stopped and the app dies, which is what I want
  • if the exception is thrown after at least one RDD has been processed, the app hangs after printing the error message and never stops, which is not what I want

Why does the app hang instead of dying in the second case?


I'm running Spark 2.1.0 on Scala 2.11.8. Getting out the try-catch solves the problem (Spark stops by itself). Also, moving out the try-catch inside foreachRDD solves the problem.

However I'm looking for an answer that can help me understand what's going on in this particular example.

like image 306
Andrei T. Avatar asked Apr 07 '17 08:04

Andrei T.


1 Answers

You will only see exceptions manifest themselves in actions (like foreachRDD in this case) rather than transformations (like transform in this case) because actions execute transformations lazily. This means your transformations won't even occur until the action. The reason why this is necessary demands changing your mental model of how distributed processing works.

Consider a conventional, single-threaded program. Code proceeds line-by-line, and if an exception is thrown and not handled, subsequent lines of code just don't execute. In a distributed system where the same Spark transformations are running in parallel on multiple machines (and at different paces), what should happen when an exception is thrown? It's not so simple since the exception on one machine is independent of the code proceeding on other machines, which is how you want it. To want all the independent tasks spread throughout a cluster to just shut down on an exception is simply single-machine thinking that doesn't translate to a distributed paradigm. How is the driver supposed to deal with that?

According to Matei Zaharia, now of Databricks and one of the creators of Spark back at Berkeley, "Exceptions should be sent back to the driver program and logged there (with a SparkException thrown if a task fails more than 4 times)." (Incidentally, this default number of retries can be changed with spark.task.maxFailures.). So if Log4J is properly configured on the executors, the exception will be logged there; then it will be serialized and sent back to the driver, who will try again 3 more times by default.

In your particular situation, I would guess you have a couple of things going on. First, you are running on a single machine, which will give a misleading picture of how exception handling works in a distributed model. Second, you are stopping the context prematurely. Stopping the context is an extremely destructive operation, which includes stopping all your listeners and the DAGScheduler. Frankly, I don't know how you can expect Spark to wrap everything up so neatly when you've basically turned out the lights.

Finally, I would mention that a more elegant exception handling model might be executing your transformations inside a Try. You will end up with potentially more cumbersome code in that your transformations will return RDD[Try[T]] or DStream[Try[T]], which means you will have to handle the Success and Failure cases for each element. But you will be able to propagate success and error information downstream with all the benefits a monad provides including mapping RDD[Try[A]] => RDD[Try[B]] and even using for comprehensions (by virtue of flatMap).

like image 160
Vidya Avatar answered Oct 10 '22 07:10

Vidya