Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Passing nullable columns as parameter to Spark SQL UDF

Here is a Spark UDF that I'm using to compute a value using few columns.

def spark_udf_func(s: String, i:Int): Boolean = { 
    // I'm returning true regardless of the parameters passed to it.
    true
}

val spark_udf = org.apache.spark.sql.functions.udf(spark_udf_func _)

val df = sc.parallelize(Array[(Option[String], Option[Int])](
  (Some("Rafferty"), Some(31)), 
  (null, Some(33)), 
  (Some("Heisenberg"), Some(33)),  
  (Some("Williams"), null)
)).toDF("LastName", "DepartmentID")

df.withColumn("valid", spark_udf(df.col("LastName"), df.col("DepartmentID"))).show()
+----------+------------+-----+
|  LastName|DepartmentID|valid|
+----------+------------+-----+
|  Rafferty|          31| true|
|      null|          33| true|
|Heisenberg|          33| true|
|  Williams|        null| null|
+----------+------------+-----+

Can anyone explain why the value for column valid is null for the last row?

When I checked the spark plan I was able to figure that the plan has a case condition where it says if column2 (DepartmentID) is null it has to return null.

== Physical Plan ==

*Project [_1#699 AS LastName#702, _2#700 AS DepartmentID#703, if (isnull(_2#700)) null else UDF(_1#699, _2#700) AS valid#717]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), assertnotnull(input[0, scala.Tuple2, true])._1), true) AS _1#699, unwrapoption(IntegerType, assertnotnull(input[0, scala.Tuple2, true])._2) AS _2#700]
   +- Scan ExternalRDDScan[obj#698]

Why do we have such behaviour in Spark?
Why only Integer columns?
What is it that I'm doing wrong here, what is the proper way to handle null's within UDF when the UDF parameter is null?

like image 287
Sudev Ambadi Avatar asked Sep 05 '17 09:09

Sudev Ambadi


1 Answers

The issue is that null is not a valid value for scala Int (which is the backing value) while it is a valid value for String. Int is equivalent to java int primitive and must have a value. This means the udf can't be called when the value is null and therefore null remains.

There are two ways to solve this:

  1. Change the function to accept java.lang.Integer (which is an object and can be null)
  2. If you can't change the function, you can use when/otherwise to do something special in case of null. For example when(col("int col").isNull, someValue).otherwise(the original call)

A good explanation of this can be found here

like image 117
Assaf Mendelson Avatar answered Oct 05 '22 02:10

Assaf Mendelson