Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Catch Exceptions that are thrown on map function in Spark

I am reading a file that It has some corrupt data.My file looks like this:

30149;E;LDI0775100  350000003221374461
30153;168034601 350000003486635135

The second line is how is supposed to look like.The first line has some extra characters in the first column. So I want to catch any exceptions that are thrown because of corrupt data.Not only the above Example. Below is the code where I load my file into my RDD and try to catch exceptions in the map Function.

  val rawCustfile = sc.textFile("/tmp/test_custmap")

  case class Row1(file_id:Int,mk_cust_id: String,ind_id:Long)


   val cleanedcustmap = rawCustfile.map(x => x.replaceAll(";", 
 "\t").split("\t")).map(x => Try{
     Row1(x(0).toInt, x(1), x(2).toLong)}match {
      case Success(map) => Right(map)
      case Failure(e) => Left(e)
    }) 

    //get the good columns
   val good_rows=cleanedcustmap.filter(_.isRight)

    //get the errors
    val error=cleanedcustmap.filter(_.isLeft)

     good_rows.collect().foreach(println)

   error.collect().foreach(println)

   val df = sqlContext.createDataFrame(cleanedcustmap.filter(_.isRight))

This good_rows.collect().foreach(println) prints:

 Right(Row1(30153,168034601,350000003486635135))

This error.collect().foreach(println) prints:

 Left(java.lang.NumberFormatException: For input string: "LDI0775100")

Everything works fine up to the point where I try to convert my rdd into a DataFrame. I get the following Exception:

 Name: scala.MatchError
 Message: Product with Serializable with 
 scala.util.Either[scala.Throwable,Row1] (of class scala.reflect.internal.Types$RefinedType0)
    StackTrace: org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:676)
    org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:630)
    org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:414)
    $line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
    $line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
    $line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
    $line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
    $line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
    $line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
    $line79.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
    $line79.$read$$iwC$$iwC$$iwC.<init>(<console>:61)
    $line79.$read$$iwC$$iwC.<init>(<console>:63)
    $line79.$read$$iwC.<init>(<console>:65)
    $line79.$read.<init>(<console>:67)
    $line79.$read$.<init>(<console>:71)
    $line79.$read$.<clinit>(<console>)
    $line79.$eval$.<init>(<console>:7)
    $line79.$eval$.<clinit>(<console>)
    $line79.$eval.$print(<console>)
    sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    java.lang.reflect.Method.invoke(Method.java:497)
    org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
    org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:361)
    org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:356)
    org.apache.toree.global.StreamState$.withStreams(StreamState.scala:81)
    org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:355)
    org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:355)
    org.apache.toree.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:140)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    java.lang.Thread.run(Thread.java:745)

My first question is, am I catching the exceptions the right way?. I want to get the errors because I want to print them. Is there a better way?. My second question is what am I doing wrong when converting my RDD into a DataFrama

like image 990
danilo Avatar asked Dec 05 '25 20:12

danilo


1 Answers

am I catching the exceptions the right way

Well, pretty much. Not sure the mapping of the Try into an Either is that improtant (you can think of a Try as a specialization of Either when the left-side type is a Throwable...) but both can work. Another issue you might want to fix is the use of replaceAll - it generates a regex out of the first argument (which you don't need in this case) and therefore is slower than replace, see Difference between String replace() and replaceAll().

what am I doing wrong when converting my RDD into a DataFrame

DataFrames only support a limited set of "standard" types:

  • Primitives (e.g. Ints, Longs, Strings...)
  • Arrays/Maps (of other supported types)
  • Products (case classes or tuples of other supported types)

Either isn't one of these types (nor is Try), so it can't be used in a DataFrame.

You can solve it by:

  1. Using only the "successful" records in your DataFrame (what use, other than logging, would the errors serve? Either way they should be handled separately):

    // this would work:
    val df = spark.createDataFrame(good_rows.map(_.right.get))
    
  2. Converting the Either into a supported type, e.g. Tuple2(Row1, String) where the string is the error message, and where one of the values in the tuple is null (left is null for erroneous records, right is null for successful records)

like image 179
Tzach Zohar Avatar answered Dec 08 '25 15:12

Tzach Zohar



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!