I'm trying to handle common exceptions in Spark, like a .map operation not working correctly on all elements of the data or a FileNotFound exception. I have read all the existing questions and the following two posts:
https://rcardin.github.io/big-data/apache-spark/scala/programming/2016/09/25/try-again-apache-spark.html
https://www.nicolaferraro.me/2016/02/18/exception-handling-in-apache-spark
I have tried a Try statement within the line attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble
so it reads attributes => Try(mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble)
But that won't compile; the compiler won't recognise the .toDF()
statement later. I have also tried a Java-like Try { Catch {}} block but can't get the scope right; df
is then not returned. Does anyone know how to do this properly? Do I even need to handle these exceptions, as the Spark framework seems to deal with a FileNotFound exception already without me adding one. But I would like to generate an error with the number of fields in the schema if the input file has the wrong number of columns, for example.
Here's the code:
object DataLoadTest extends SparkSessionWrapper {
/** Helper function to create a DataFrame from a textfile, re-used in subsequent tests */
def createDataFrame(fileName: String): DataFrame = {
import spark.implicits._
//try {
val df = spark.sparkContext
.textFile("/path/to/file" + fileName)
.map(_.split("\\t"))
//mHealth user is the case class which defines the data schema
.map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble,
attributes(3).toDouble, attributes(4).toDouble,
attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble,
attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble,
attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble,
attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble,
attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble,
attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble,
attributes(23).toInt))
.toDF()
.cache()
df
} catch {
case ex: FileNotFoundException => println(s"File $fileName not found")
case unknown: Exception => println(s"Unknown exception: $unknown")
}
}
All suggestions appreciated. Thanks!
Another option would be to use Try type in scala.
For example:
def createDataFrame(fileName: String): Try[DataFrame] = {
try {
//create dataframe df
Success(df)
} catch {
case ex: FileNotFoundException => {
println(s"File $fileName not found")
Failure(ex)
}
case unknown: Exception => {
println(s"Unknown exception: $unknown")
Failure(unknown)
}
}
}
Now, in the caller side, handle it like:
createDataFrame("file1.csv") match {
case Success(df) => {
// proceed with your pipeline
}
case Failure(ex) => //handle exception
}
This is slightly better than using Option as caller would know the reason for failure and can handle better.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With