Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to handle exceptions in Spark and Scala

I'm trying to handle common exceptions in Spark, like a .map operation not working correctly on all elements of the data or a FileNotFound exception. I have read all the existing questions and the following two posts:

https://rcardin.github.io/big-data/apache-spark/scala/programming/2016/09/25/try-again-apache-spark.html

https://www.nicolaferraro.me/2016/02/18/exception-handling-in-apache-spark

I have tried a Try statement within the line attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble
so it reads attributes => Try(mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble)

But that won't compile; the compiler won't recognise the .toDF() statement later. I have also tried a Java-like Try { Catch {}} block but can't get the scope right; df is then not returned. Does anyone know how to do this properly? Do I even need to handle these exceptions, as the Spark framework seems to deal with a FileNotFound exception already without me adding one. But I would like to generate an error with the number of fields in the schema if the input file has the wrong number of columns, for example.

Here's the code:

object DataLoadTest extends SparkSessionWrapper {
/** Helper function to create a DataFrame from a textfile, re-used in        subsequent tests */
def createDataFrame(fileName: String): DataFrame = {

import spark.implicits._

//try {
val df = spark.sparkContext
  .textFile("/path/to/file" + fileName)
  .map(_.split("\\t"))
//mHealth user is the case class which defines the data schema
  .map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble,
        attributes(3).toDouble, attributes(4).toDouble,
        attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble,
        attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble,
        attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble,
        attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble,
        attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble,
        attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble,
        attributes(23).toInt))
  .toDF()
  .cache()
df
} catch {
    case ex: FileNotFoundException => println(s"File $fileName not found")
    case unknown: Exception => println(s"Unknown exception: $unknown")

}
}

All suggestions appreciated. Thanks!

like image 670
LucieCBurgess Avatar asked Dec 02 '22 11:12

LucieCBurgess


1 Answers

Another option would be to use Try type in scala.

For example:

def createDataFrame(fileName: String): Try[DataFrame] = {

try {
      //create dataframe df
      Success(df)
    } catch {
      case ex: FileNotFoundException => {
        println(s"File $fileName not found")
        Failure(ex)
      }
      case unknown: Exception => {
        println(s"Unknown exception: $unknown")
        Failure(unknown)
      }
    }
  }

Now, in the caller side, handle it like:

createDataFrame("file1.csv") match {
  case Success(df) => {
    // proceed with your pipeline
  }
  case Failure(ex) => //handle exception
}

This is slightly better than using Option as caller would know the reason for failure and can handle better.

like image 170
Neeraj Malhotra Avatar answered Dec 04 '22 21:12

Neeraj Malhotra