Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SparkSQL: How to deal with null values in user defined function?

Given Table 1 with one column "x" of type String. I want to create Table 2 with a column "y" that is an integer representation of the date strings given in "x".

Essential is to keep null values in column "y".

Table 1 (Dataframe df1):

+----------+ |         x| +----------+ |2015-09-12| |2015-09-13| |      null| |      null| +----------+ root  |-- x: string (nullable = true) 

Table 2 (Dataframe df2):

+----------+--------+                                                                   |         x|       y| +----------+--------+ |      null|    null| |      null|    null| |2015-09-12|20150912| |2015-09-13|20150913| +----------+--------+ root  |-- x: string (nullable = true)  |-- y: integer (nullable = true) 

While the user-defined function (udf) to convert values from column "x" into those of column "y" is:

val extractDateAsInt = udf[Int, String] (   (d:String) => d.substring(0, 10)       .filterNot( "-".toSet)       .toInt ) 

and works, dealing with null values is not possible.

Even though, I can do something like

val extractDateAsIntWithNull = udf[Int, String] (   (d:String) =>      if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt      else 1 ) 

I have found no way, to "produce" null values via udfs (of course, as Ints can not be null).

My current solution for creation of df2 (Table 2) is as follows:

// holds data of table 1   val df1 = ...   // filter entries from df1, that are not null val dfNotNulls = df1.filter(df1("x")   .isNotNull)   .withColumn("y", extractDateAsInt(df1("x")))   .withColumnRenamed("x", "right_x")  // create df2 via a left join on df1 and dfNotNull having  val df2 = df1.join( dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter" ).drop("right_x") 

Questions:

  • The current solution seems cumbersome (and probably not efficient wrt. performance). Is there a better way?
  • @Spark-developers: Is there a type NullableInt planned / avaiable, such that the following udf is possible (see Code excerpt ) ?

Code excerpt

val extractDateAsNullableInt = udf[NullableInt, String] (   (d:String) =>      if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt      else null ) 
like image 466
Martin Senne Avatar asked Sep 02 '15 15:09

Martin Senne


People also ask

How do you handle null values?

Putting the median or mean of the whole column was the simple approach. But I like a bit more specific approach to the median and mean. Instead of taking the median of the whole age column and filling up all the null values, filling up the null values using the mean age of each pclass and 'alive' will be more accurate.

How do you handle null in PySpark?

1. Filter Rows with NULL Values in DataFrame. In PySpark, using filter() or where() functions of DataFrame we can filter rows with NULL values by checking isNULL() of PySpark Column class. The above statements return all rows that have null values on the state column and the result is returned as the new DataFrame.

How do you change a null value in Scala?

To replace the null values, the spark has an in-built fill() method to fill all dataTypes by specified default values except for DATE, TIMESTAMP. We separately handle them.


2 Answers

This is where Optioncomes in handy:

val extractDateAsOptionInt = udf((d: String) => d match {   case null => None   case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt) }) 

or to make it slightly more secure in general case:

import scala.util.Try  val extractDateAsOptionInt = udf((d: String) => Try(   d.substring(0, 10).filterNot("-".toSet).toInt ).toOption) 

All credit goes to Dmitriy Selivanov who've pointed out this solution as a (missing?) edit here.

Alternative is to handle null outside the UDF:

import org.apache.spark.sql.functions.{lit, when} import org.apache.spark.sql.types.IntegerType  val extractDateAsInt = udf(    (d: String) => d.substring(0, 10).filterNot("-".toSet).toInt )  df.withColumn("y",   when($"x".isNull, lit(null))     .otherwise(extractDateAsInt($"x"))     .cast(IntegerType) ) 
like image 58
zero323 Avatar answered Sep 19 '22 23:09

zero323


Scala actually has a nice factory function, Option(), that can make this even more concise:

val extractDateAsOptionInt = udf((d: String) =>    Option(d).map(_.substring(0, 10).filterNot("-".toSet).toInt)) 

Internally the Option object's apply method is just doing the null check for you:

def apply[A](x: A): Option[A] = if (x == null) None else Some(x) 
like image 32
tristanbuckner Avatar answered Sep 22 '22 23:09

tristanbuckner