I am reading a file that It has some corrupt data.My file looks like this:
30149;E;LDI0775100 350000003221374461
30153;168034601 350000003486635135
The second line is how is supposed to look like.The first line has some extra characters in the first column. So I want to catch any exceptions that are thrown because of corrupt data.Not only the above Example. Below is the code where I load my file into my RDD and try to catch exceptions in the map Function.
val rawCustfile = sc.textFile("/tmp/test_custmap")
case class Row1(file_id:Int,mk_cust_id: String,ind_id:Long)
val cleanedcustmap = rawCustfile.map(x => x.replaceAll(";",
"\t").split("\t")).map(x => Try{
Row1(x(0).toInt, x(1), x(2).toLong)}match {
case Success(map) => Right(map)
case Failure(e) => Left(e)
})
//get the good columns
val good_rows=cleanedcustmap.filter(_.isRight)
//get the errors
val error=cleanedcustmap.filter(_.isLeft)
good_rows.collect().foreach(println)
error.collect().foreach(println)
val df = sqlContext.createDataFrame(cleanedcustmap.filter(_.isRight))
This good_rows.collect().foreach(println) prints:
Right(Row1(30153,168034601,350000003486635135))
This error.collect().foreach(println) prints:
Left(java.lang.NumberFormatException: For input string: "LDI0775100")
Everything works fine up to the point where I try to convert my rdd into a DataFrame. I get the following Exception:
Name: scala.MatchError
Message: Product with Serializable with
scala.util.Either[scala.Throwable,Row1] (of class scala.reflect.internal.Types$RefinedType0)
StackTrace: org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:676)
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:630)
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:414)
$line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
$line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
$line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:51)
$line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
$line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
$line79.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
$line79.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
$line79.$read$$iwC$$iwC$$iwC.<init>(<console>:61)
$line79.$read$$iwC$$iwC.<init>(<console>:63)
$line79.$read$$iwC.<init>(<console>:65)
$line79.$read.<init>(<console>:67)
$line79.$read$.<init>(<console>:71)
$line79.$read$.<clinit>(<console>)
$line79.$eval$.<init>(<console>:7)
$line79.$eval$.<clinit>(<console>)
$line79.$eval.$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:497)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:361)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:356)
org.apache.toree.global.StreamState$.withStreams(StreamState.scala:81)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:355)
org.apache.toree.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:355)
org.apache.toree.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:140)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
My first question is, am I catching the exceptions the right way?. I want to get the errors because I want to print them. Is there a better way?. My second question is what am I doing wrong when converting my RDD into a DataFrama
am I catching the exceptions the right way
Well, pretty much. Not sure the mapping of the Try into an Either is that improtant (you can think of a Try as a specialization of Either when the left-side type is a Throwable...) but both can work. Another issue you might want to fix is the use of replaceAll - it generates a regex out of the first argument (which you don't need in this case) and therefore is slower than replace, see Difference between String replace() and replaceAll().
what am I doing wrong when converting my RDD into a DataFrame
DataFrames only support a limited set of "standard" types:
Either isn't one of these types (nor is Try), so it can't be used in a DataFrame.
You can solve it by:
Using only the "successful" records in your DataFrame (what use, other than logging, would the errors serve? Either way they should be handled separately):
// this would work:
val df = spark.createDataFrame(good_rows.map(_.right.get))
Converting the Either into a supported type, e.g. Tuple2(Row1, String) where the string is the error message, and where one of the values in the tuple is null (left is null for erroneous records, right is null for successful records)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With