Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Convert null values to empty array in Spark DataFrame

Tags:

I have a Spark data frame where one column is an array of integers. The column is nullable because it is coming from a left outer join. I want to convert all null values to an empty array so I don't have to deal with nulls later.

I thought I could do it like so:

val myCol = df("myCol") df.withColumn( "myCol", when(myCol.isNull, Array[Int]()).otherwise(myCol) ) 

However, this results in the following exception:

java.lang.RuntimeException: Unsupported literal type class [I [I@5ed25612 at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:49) at org.apache.spark.sql.functions$.lit(functions.scala:89) at org.apache.spark.sql.functions$.when(functions.scala:778) 

Apparently array types are not supported by the when function. Is there some other easy way to convert the null values?

In case it is relevant, here is the schema for this column:

|-- myCol: array (nullable = true) |    |-- element: integer (containsNull = false) 
like image 963
Daniel Siegmann Avatar asked Jan 07 '16 16:01

Daniel Siegmann


People also ask

How do I change the null value in Spark DataFrame?

In Spark, fill() function of DataFrameNaFunctions class is used to replace NULL values on the DataFrame column with either with zero(0), empty string, space, or any constant literal values.

How do you handle NULL values in Spark?

You can keep null values out of certain columns by setting nullable to false . You won't be able to set nullable to false for all columns in a DataFrame and pretend like null values don't exist. For example, when joining DataFrames, the join column will return null when a match cannot be made.

How do I change NULL values in a DataFrame PySpark?

In PySpark, DataFrame. fillna() or DataFrameNaFunctions. fill() is used to replace NULL/None values on all or selected multiple DataFrame columns with either zero(0), empty string, space, or any constant literal values.


2 Answers

You can use an UDF:

import org.apache.spark.sql.functions.udf  val array_ = udf(() => Array.empty[Int]) 

combined with WHEN or COALESCE:

df.withColumn("myCol", when(myCol.isNull, array_()).otherwise(myCol)) df.withColumn("myCol", coalesce(myCol, array_())).show 

In the recent versions you can use array function:

import org.apache.spark.sql.functions.{array, lit}  df.withColumn("myCol", when(myCol.isNull, array().cast("array<integer>")).otherwise(myCol)) df.withColumn("myCol", coalesce(myCol, array().cast("array<integer>"))).show 

Please note that it will work only if conversion from string to the desired type is allowed.

The same thing can be of course done in PySpark as well. For the legacy solutions you can define udf

from pyspark.sql.functions import udf from pyspark.sql.types import ArrayType, IntegerType  def empty_array(t):     return udf(lambda: [], ArrayType(t()))()  coalesce(myCol, empty_array(IntegerType())) 

and in the recent versions just use array:

from pyspark.sql.functions import array  coalesce(myCol, array().cast("array<integer>")) 
like image 152
zero323 Avatar answered Sep 17 '22 17:09

zero323


With a slight modification to zero323's approach, I was able to do this without using a udf in Spark 2.3.1.

val df = Seq("a" -> Array(1,2,3), "b" -> null, "c" -> Array(7,8,9)).toDF("id","numbers") df.show +---+---------+ | id|  numbers| +---+---------+ |  a|[1, 2, 3]| |  b|     null| |  c|[7, 8, 9]| +---+---------+  val df2 = df.withColumn("numbers", coalesce($"numbers", array())) df2.show +---+---------+ | id|  numbers| +---+---------+ |  a|[1, 2, 3]| |  b|       []| |  c|[7, 8, 9]| +---+---------+ 
like image 42
Jeremy Avatar answered Sep 20 '22 17:09

Jeremy