Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Error handling with Try match inside an udf - and log row where it failed

Scala version 2.11 and Spark 2.0.1.

I have a dataframe where I am doing some operations inside an udf. I want to be able to have the operations run and return error only on rows where it failed. I also would like to return success/failure as an additional field. Pass/Fail can be on a separate column.

This is what I tried:

val df = Seq(("as", 1, "df"), ("1", 2, "3")).toDF("a", "b", "c")
val df1 = Seq(("1", 1, "3"), ("1", 2, "3")).toDF("a", "b", "c")

def myUdf = udf((i: String, j: Int, k: Int) => { 
   def test (ii:String, jj:Int, kk:Int): Try[Int] = {
     val q = i.toInt * j * k.toInt
     val m = q * i.toInt
     return (Try(q))
  }
  val q = Try(test(i, j, k)) match { 
    case Success(lines) => lines.toString
    case _ => "Failed"
  }
  q
})

# First Example
val df2 = df.withColumn("D", myUdf($"a", $"b", $"c")) <-- This fails

# Second Example 
val df3 = df1.withColumn("D", myUdf($"a", $"b", $"c"))
df3.show
  +---+---+---+----------+
  |  a|  b|  c|         D|
  +---+---+---+----------+
  |  1|  1|  3|Success(3)|
  |  1|  2|  3|Success(6)|
  +---+---+---+----------+

1) How is it possible to get [0-9] values for the integer case (instead of Success(3) and Success(6) -- i.e. remove the Success and the parenthesis -- 3 and 6 can be characters)? Also, how do I add success/ failure to each row?

2) Is it possible to use Try match to check when the Udf fails, without doing error handling at each step - How do we go to the next computation when it fails on one? Note: There are tons of computations going inside the 'test' method.

3) What are potential alternate methods for globally checking the udf?

like image 882
Terry Avatar asked Apr 27 '18 00:04

Terry


People also ask

Why we should not use UDF in spark?

Spark UDFs are not good but why?? 1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer.

How do you catch exceptions in Scala?

The try/catch construct is different in Scala than in Java, try/catch in Scala is an expression. The exception in Scala and that results in a value can be pattern matched in the catch block instead of providing a separate catch clause for each different exception. Because try/catch in Scala is an expression.

Can a UDF return multiple columns?

UDF can return only a single column at the time.

What is a UDF in spark?

Description. User-Defined Functions (UDFs) are user-programmable routines that act on one row. This documentation lists the classes that are required for creating and registering UDFs. It also contains examples that demonstrate how to define and register UDFs and invoke them in Spark SQL.


1 Answers

You can do this using Try, however, note that the Try should surround the whole body of the test method and not only be applied on the result (you also should not use the return keyword here). After that use match to get the result.

def myUdf = udf((i: String, j: Int, k: String) => { 
  def test(ii: String, jj: Int, kk: String): Try[Int] = Try {
    val q = i.toInt * j * k.toInt
    val m = q * i.toInt
    q
  }

  test(i, j, k) match { 
    case Success(lines) => lines.toString
    case _ => "Failed"
  }
})

Note that k as well as kk are of type String since that is what you have in both the test dataframes. If you use Int and a column value can't be implicitly converted (such as "df"), that row will not run the udf and you will get a null.

Result using the two dataframes:

+---+---+---+------+
|  a|  b|  c|     D|
+---+---+---+------+
| as|  1| df|Failed|
|  1|  2|  3|     6|
+---+---+---+------+

+---+---+---+---+
|  a|  b|  c|  D|
+---+---+---+---+
|  1|  1|  3|  3|
|  1|  2|  3|  6|
+---+---+---+---+
  1. As can be see, this will give only the values or "Failed" as a result, the Success is removed, i.e. the result is returned as a String.

  2. Upon failure in the test method, an exception will be raised which is caught by the Try. This means that the method will exit upon failure and not continue on to the end.

  3. To find all rows which failed, use the filter method: df2.filter($"D" === "Failed").

like image 55
Shaido Avatar answered Oct 23 '22 06:10

Shaido