We have a pyspark dataframe with several columns containing arrays with multiple values. Our goal is to have each of this values of these columns in several rows, keeping the initial different columns. So, starting with something like this:
data = [
("A", ["a", "c"], ["1", "5"]),
("B", ["a", "b"], None),
("C", [], ["1"]),
]
Whats:
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A |[a, c]|[1, 5]|
|B |[a, b]|null |
|C |[] |[1] |
+---+------+------+
We would like to end up having:
+---+----+----+
|id |col |col |
+---+----+----+
|A |a |null|
|A |c |null|
|A |null|1 |
|A |null|5 |
|B |a |null|
|B |b |null|
|C |null|1 |
+---+----+----+
We are thinking about several approaches:
But all of them smell like dirty, complex, error prone and inefficient workarounds.
Does anyone have an idea about how to solve this in an elegant manner?
To split multiple array column data into rows pyspark provides a function called explode().
explode() – PySpark explode array or map column to rows PySpark function explode(e: Column) is used to explode or create array or map columns to rows. When an array is passed to this function, it creates a new default column “col1” and it contains all array elements.
Spark pivot() function is used to pivot/rotate the data from one DataFrame/Dataset column into multiple columns (transform row to column) and unpivot is used to transform it back (transform columns to rows).
The PySpark SQL provides the split() function to convert delimiter separated String to an Array (StringType to ArrayType) column on DataFrame It can be done by splitting the string column on the delimiter like space, comma, pipe, etc. and converting it into ArrayType.
In case both columns list_a and list_b could be empty, I would add a 4th case in the dataset
data = [
("A", ["a", "c"], ["1", "5"]),
("B", ["a", "b"], None),
("C", [], ["1"]),
("D", None, None),
]
df = spark.createDataFrame(data,["id","list_a","list_b"])
I would then split the original df in 3 (both nulls, list_a exploded and list_b exploded) and the execute a unionByName
dfnulls = df.filter(col("list_a").isNull() & col("list_b").isNull())\
.withColumn("list_a", lit(None))\
.withColumn("list_b", lit(None))
df1 = df\
.withColumn("list_a", explode_outer(col("list_a")))\
.withColumn("list_b", lit(None))\
.filter(~col("list_a").isNull())
df2 = df\
.withColumn("list_b", explode_outer(col("list_b")))\
.withColumn("list_a", lit(None))\
.filter(~col("list_b").isNull())
merged_df = df1.unionByName(df2).unionByName(dfnulls)
merged_df.show()
+---+------+------+
| id|list_a|list_b|
+---+------+------+
| A| a| null|
| A| c| null|
| B| a| null|
| B| b| null|
| A| null| 1|
| A| null| 5|
| C| null| 1|
| D| null| null|
+---+------+------+
The following approach might help you and it's based on Scala
Basically exploding the respective list columns individually and joining the datasets based on the dummy column to get the desired result.
import org.apache.spark.sql.functions.{explode_outer, col, lit, concat}
val df1 = inputDF
.withColumn("list_a", explode_outer(col("list_a")))
.withColumn("random_join_col", concat(col("id"), lit("1")))
.drop("list_b")
val df2 = inputDF
.withColumn("list_b", explode_outer(col("list_b")))
.withColumn("random_join_col", concat(col("id"), lit("2")))
.drop("list_a")
val finalDF = df1.join(df2, Seq("id", "random_join_col"), "full_outer").drop("random_join_col")
// Drop rows, if it got null value on both the list columns
finalDF.na.drop(how = "all", Seq("list_a","list_b")).orderBy("id").show(false)
Try this dynamic solution.
Input:
data = [
("A", ["a", "c"], ["1", "5"]),
("B", ["a", "b"], None),
("C", [], ["1"]),
]
df=spark.createDataFrame(data,["id","list_a","list_b"])
df.show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A |[a, c]|[1, 5]|
|B |[a, b]|null |
|C |[] |[1] |
+---+------+------+
Lets create an array of Dataframes for each of the array columns in df. Initialize first with empty Dataframe and then override it in the for loop. For each column, explode it and for all other columns, change the datatype to string with NULL.
from pyspark.sql.types import *
array_cols=df.columns[1:] #just ignoring the ID column
c=0
dfarr=[spark.createDataFrame([],schema=StructType()) for i in array_cols ]
for i in array_cols:
dfarr[c]=df.withColumn(i,explode(col(i)))
for j in array_cols:
if(i!=j):
dfarr[c]=dfarr[c].withColumn(j,expr(" cast(null as string) "))
c=c+1
Now, dfarr is an array of dataframes with the schema like
dfarr[0].printSchema()
root
|-- id: string (nullable = true)
|-- list_a: string (nullable = true)
|-- list_b: string (nullable = true)
dfarr[1].show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A |null |1 |
|A |null |5 |
|C |null |1 |
+---+------+------+
The datatypes in dfarr is all similar now, so just do a union of all them. For this we need the reduce function from functools
from functools import reduce
from pyspark.sql import DataFrame
def unionAll(*dfs):
return reduce(DataFrame.unionByName, dfs)
Applying to our dfarr
combo=unionAll(*dfarr)
combo.show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A |a |null |
|A |c |null |
|B |a |null |
|B |b |null |
|A |null |1 |
|A |null |5 |
|C |null |1 |
+---+------+------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With