Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to handle the Exception in spark map() function?

I want to ignore Exception in map() function , for example:

rdd.map(_.toInt)

where rdd is a RDD[String].

but if it meets non-number string, it will failed.

what is the easist way to ignore any Exception and ignore that line? (I do not want to use filter to handle exception, because there may be so many other exceptions...)

like image 661
user2848932 Avatar asked May 04 '15 06:05

user2848932


People also ask

How to handle exception in Spark scala?

Delta Lake with Apache Spark using Scala When you want to handle exceptions, you use a try{...} catch{...} block like you would in Java except that the catch block uses matching to identify and handle the exceptions.

What is output of MAP in spark?

Spark map() transformation applies a function to each row in a DataFrame/Dataset and returns the new transformed Dataset. As mentioned earlier, map() returns one row for every row in an input DataFrame. In other words, input and the result exactly contain the same number of rows.

What is the difference between map and mapPartitions in Spark?

mapPartitions() – This is exactly the same as map(); the difference being, Spark mapPartitions() provides a facility to do heavy initializations (for example Database connection) once for each partition instead of doing it on every DataFrame row.

What is exception handling in Scala?

Exception handling is the mechanism to respond to the occurrence of an exception. Exceptions can be checked or unchecked. Scala only allows unchecked exceptions, though. This means that, at compile-time, we won't be able to know if a method is throwing an exception we are not handling.


2 Answers

You can use a combination of Try and map/filter.

Try will wrap your computation into Success, if they behave as expected, or Failure, if an exception is thrown. Then you can filter what you want - in this case the successful computations, but you could also filter the error cases for logging purposes, for example.

The following code is a possible starting point. You can run and explore it in scastie.org to see if it fits your needs.

import scala.util.Try

object Main extends App {

  val in = List("1", "2", "3", "abc")

  val out1 = in.map(a => Try(a.toInt))
  val results = out1.filter(_.isSuccess).map(_.get)

  println(results)

}
like image 188
gamsd Avatar answered Oct 22 '22 03:10

gamsd


I recommend you to use filter/map

rdd.filter(r=>NumberUtils.isNumber(r)).map(r=> r.toInt)

or flatmap

exampleRDD.flatMap(r=> {if (NumberUtils.isNumber(r)) Some(r.toInt) else  None})

Otherwise you can catch exception in map function

myRDD.map(r => { try{
        r.toInt
    }catch {
        case runtime: RuntimeException => {
        -1
        }
    }
})

and then apply filter(on -1)

like image 5
banjara Avatar answered Oct 22 '22 05:10

banjara