The issues:
1) Spark doesn't call UDF if input is column of primitive type that contains null
:
inputDF.show()
+-----+
| x |
+-----+
| null|
| 1.0|
+-----+
inputDF
.withColumn("y",
udf { (x: Double) => 2.0 }.apply($"x") // will not be invoked if $"x" == null
)
.show()
+-----+-----+
| x | y |
+-----+-----+
| null| null|
| 1.0| 2.0|
+-----+-----+
2) Can't produce null
from UDF as a column of primitive type:
udf { (x: String) => null: Double } // compile error
In Spark, using filter() or where() functions of DataFrame we can filter rows with NULL values by checking IS NULL or isNULL . These removes all rows with null values on state column and returns the new DataFrame. All above examples returns the same output.
In order to remove Rows with NULL values on selected columns of Spark DataFrame, use drop(columns:Seq[String]) or drop(columns:Array[String]). To these functions pass the names of the columns you wanted to check for NULL values to delete rows.
Spark UDFs are not good but why?? 1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer.
Since 30th October, 2017, Spark just introduced vectorized udfs for pyspark. The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way: According to the paragraph from the link. Spark added a Python API in version 0.7, with support for user-defined functions.
Accordingly to the docs:
Note that if you use primitive parameters, you are not able to check if it is null or not, and the UDF will return null for you if the primitive input is null. Use boxed type or [[Option]] if you wanna do the null-handling yourself.
So, the easiest solution is just to use
boxed types if your UDF input is nullable column of primitive type
OR/AND you need to output null from UDF as a column of primitive type:
inputDF
.withColumn("y",
udf { (x: java.lang.Double) =>
(if (x == null) 1 else null): java.lang.Integer
}.apply($"x")
)
.show()
+-----+-----+
| x | y |
+-----+-----+
| null| null|
| 1.0| 2.0|
+-----+-----+
I would also use Artur's solution, but there is also another way without using javas wrapper classes by using struct
:
import org.apache.spark.sql.functions.struct
import org.apache.spark.sql.Row
inputDF
.withColumn("y",
udf { (r: Row) =>
if (r.isNullAt(0)) Some(1) else None
}.apply(struct($"x"))
)
.show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With