I have a Dataframe that I am trying to flatten. As part of the process, I want to explode it, so if I have a column of arrays, each value of the array will be used to create a separate row. For instance,
id | name | likes _______________________________ 1 | Luke | [baseball, soccer]
should become
id | name | likes _______________________________ 1 | Luke | baseball 1 | Luke | soccer
This is my code
private DataFrame explodeDataFrame(DataFrame df) { DataFrame resultDf = df; for (StructField field : df.schema().fields()) { if (field.dataType() instanceof ArrayType) { resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name()))); resultDf.show(); } } return resultDf; }
The problem is that in my data, some of the array columns have nulls. In that case, the entire row is deleted. So this dataframe:
id | name | likes _______________________________ 1 | Luke | [baseball, soccer] 2 | Lucy | null
becomes
id | name | likes _______________________________ 1 | Luke | baseball 1 | Luke | soccer
instead of
id | name | likes _______________________________ 1 | Luke | baseball 1 | Luke | soccer 2 | Lucy | null
How can I explode my arrays so that I don't lose the null rows?
I am using Spark 1.5.2 and Java 8
In order to remove Rows with NULL values on selected columns of Spark DataFrame, use drop(columns:Seq[String]) or drop(columns:Array[String]). To these functions pass the names of the columns you wanted to check for NULL values to delete rows.
Spark Rules for Dealing with nullScala code should return None (or null) for values that are unknown, missing, or irrelevant. DataFrames should also use null for for values that are unknown, missing, or irrelevant. Use Option in Scala code and fall back on null if Option becomes a performance bottleneck.
Spark SQL supports null ordering specification in ORDER BY clause. Spark processes the ORDER BY clause by placing all the NULL values at first or at last depending on the null ordering specification.
In Spark, fill() function of DataFrameNaFunctions class is used to replace NULL values on the DataFrame column with either zero(0), empty string, space, or any constant literal values.
Spark 2.2+
You can use explode_outer
function:
import org.apache.spark.sql.functions.explode_outer df.withColumn("likes", explode_outer($"likes")).show // +---+----+--------+ // | id|name| likes| // +---+----+--------+ // | 1|Luke|baseball| // | 1|Luke| soccer| // | 2|Lucy| null| // +---+----+--------+
Spark <= 2.1
In Scala but Java equivalent should be almost identical (to import individual functions use import static
).
import org.apache.spark.sql.functions.{array, col, explode, lit, when} val df = Seq( (1, "Luke", Some(Array("baseball", "soccer"))), (2, "Lucy", None) ).toDF("id", "name", "likes") df.withColumn("likes", explode( when(col("likes").isNotNull, col("likes")) // If null explode an array<string> with a single null .otherwise(array(lit(null).cast("string")))))
The idea here is basically to replace NULL
with an array(NULL)
of a desired type. For complex type (a.k.a structs
) you have to provide full schema:
val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y") val st = StructType(Seq( StructField("_1", IntegerType, false), StructField("_2", StringType, true) )) dfStruct.withColumn("y", explode( when(col("y").isNotNull, col("y")) .otherwise(array(lit(null).cast(st)))))
or
dfStruct.withColumn("y", explode( when(col("y").isNotNull, col("y")) .otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))
Note:
If array Column
has been created with containsNull
set to false
you should change this first (tested with Spark 2.1):
df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With