Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark sql how to explode without losing null values

I have a Dataframe that I am trying to flatten. As part of the process, I want to explode it, so if I have a column of arrays, each value of the array will be used to create a separate row. For instance,

id | name | likes _______________________________ 1  | Luke | [baseball, soccer] 

should become

id | name | likes _______________________________ 1  | Luke | baseball 1  | Luke | soccer 

This is my code

private DataFrame explodeDataFrame(DataFrame df) {     DataFrame resultDf = df;     for (StructField field : df.schema().fields()) {         if (field.dataType() instanceof ArrayType) {             resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));             resultDf.show();         }     }     return resultDf; } 

The problem is that in my data, some of the array columns have nulls. In that case, the entire row is deleted. So this dataframe:

id | name | likes _______________________________ 1  | Luke | [baseball, soccer] 2  | Lucy | null 

becomes

id | name | likes _______________________________ 1  | Luke | baseball 1  | Luke | soccer 

instead of

id | name | likes _______________________________ 1  | Luke | baseball 1  | Luke | soccer 2  | Lucy | null 

How can I explode my arrays so that I don't lose the null rows?

I am using Spark 1.5.2 and Java 8

like image 380
alexgbelov Avatar asked Sep 28 '16 05:09

alexgbelov


People also ask

How does Spark ignore NULL values?

In order to remove Rows with NULL values on selected columns of Spark DataFrame, use drop(columns:Seq[String]) or drop(columns:Array[String]). To these functions pass the names of the columns you wanted to check for NULL values to delete rows.

How does SQL handle nulls in Spark?

Spark Rules for Dealing with nullScala code should return None (or null) for values that are unknown, missing, or irrelevant. DataFrames should also use null for for values that are unknown, missing, or irrelevant. Use Option in Scala code and fall back on null if Option becomes a performance bottleneck.

Does Spark join NULL values?

Spark SQL supports null ordering specification in ORDER BY clause. Spark processes the ORDER BY clause by placing all the NULL values at first or at last depending on the null ordering specification.

How do I replace NULL with zero in Spark?

In Spark, fill() function of DataFrameNaFunctions class is used to replace NULL values on the DataFrame column with either zero(0), empty string, space, or any constant literal values.


1 Answers

Spark 2.2+

You can use explode_outer function:

import org.apache.spark.sql.functions.explode_outer  df.withColumn("likes", explode_outer($"likes")).show  // +---+----+--------+ // | id|name|   likes| // +---+----+--------+ // |  1|Luke|baseball| // |  1|Luke|  soccer| // |  2|Lucy|    null| // +---+----+--------+ 

Spark <= 2.1

In Scala but Java equivalent should be almost identical (to import individual functions use import static).

import org.apache.spark.sql.functions.{array, col, explode, lit, when}  val df = Seq(   (1, "Luke", Some(Array("baseball", "soccer"))),   (2, "Lucy", None) ).toDF("id", "name", "likes")  df.withColumn("likes", explode(   when(col("likes").isNotNull, col("likes"))     // If null explode an array<string> with a single null     .otherwise(array(lit(null).cast("string"))))) 

The idea here is basically to replace NULL with an array(NULL) of a desired type. For complex type (a.k.a structs) you have to provide full schema:

val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")  val st =  StructType(Seq(   StructField("_1", IntegerType, false), StructField("_2", StringType, true) ))  dfStruct.withColumn("y", explode(   when(col("y").isNotNull, col("y"))     .otherwise(array(lit(null).cast(st))))) 

or

dfStruct.withColumn("y", explode(   when(col("y").isNotNull, col("y"))     .otherwise(array(lit(null).cast("struct<_1:int,_2:string>"))))) 

Note:

If array Column has been created with containsNull set to false you should change this first (tested with Spark 2.1):

df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true))) 
like image 96
zero323 Avatar answered Sep 20 '22 19:09

zero323