Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Uncaught Exception Handling in Spark

I am working on a Java based Spark Streaming application which responds to messages that come through a Kafka topic. For each message, the application does some processing, and writes back the results to a different Kafka topic.

Sometimes due to unexpected data related issues, the code that operates on RDDs might fail and throw an exception. When that happens, I would like to have a generic handler that could take necessary action and drop a message to an error topic. Right now, these exceptions are written in Spark's log by Spark itself.

What is the best approach to do this, instead of writing try-catch blocks for every code block that work on RDDs?

like image 389
Yohan Liyanage Avatar asked Oct 21 '15 14:10

Yohan Liyanage


People also ask

How are uncaught exceptions handled?

Uncaught exceptions can be prevented by proper handling of exceptions in the code. Exceptions should be caught in a try/catch block or by using a finally block. In some cases, it may be necessary to throw the exception to a higher level so that it can be properly handled.

What happens uncaught exception?

When an uncaught exception occurs, the JVM calls a special private method known dispatchUncaughtException( ), on the Thread class in which the exception occurs and terminates the thread. The Division by zero exception is one of the example for uncaught exceptions.

How is uncaught exception handled Java?

The try-catch is the simplest method of handling exceptions. Put the code you want to run in the try block, and any Java exceptions that the code throws are caught by one or more catch blocks. This method will catch any type of Java exceptions that get thrown. This is the simplest mechanism for handling exceptions.


2 Answers

You could write a generic function that does this. You only need to wrap it around RDD actions since those are the only ones that can throw Spark exceptions (transformers like .map and .filter are lazy executed by actions).

(Assuming this is in Scala) You could maybe even try something with implicits. Make a class that holds an RDD and handles the error. Here a sketch of what that might look like:

implicit class FailSafeRDD[T](rdd: RDD[T]) {
  def failsafeAction[U](fn: RDD[T] => U): Try[U] = Try {
    fn(rdd)
  }
}

You could add error topic messaging into failsafeAction or anything you want to do every time on failure. And then usage might be like:

val rdd = ??? // Some rdd you already have
val resultOrException = rdd.failsafeAction { r => r.count() }

Besides this though, I imagine the "best" approach is somewhat subjective to application needs.

like image 54
Rich Avatar answered Oct 13 '22 01:10

Rich


I think you could also implement this with a try catch =>

dstream.foreachRDD { case rdd: RDD[String] => 
    rdd.foreach { case string: String => 
      try {
        val kafkaProducer = ...
        val msg = ...
        kafkaProducer.send(msg)
      } catch {
        case d: DataException=>
          val kafkaErrorProducer = ...
          val errorMsg = ...
          kafkaErrorProducer.send(errorMsg )
        case t: Throwable =>
          //further error handling
      }
   }
}
like image 22
Patrick McGloin Avatar answered Oct 13 '22 02:10

Patrick McGloin