Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark > Dataframe with multiple array columns into multiple rows with one value each

We have a pyspark dataframe with several columns containing arrays with multiple values. Our goal is to have each of this values of these columns in several rows, keeping the initial different columns. So, starting with something like this:

data = [
    ("A", ["a", "c"], ["1", "5"]),
    ("B", ["a", "b"], None),
    ("C", [], ["1"]),
]

Whats:

+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A  |[a, c]|[1, 5]|
|B  |[a, b]|null  |
|C  |[]    |[1]   |
+---+------+------+

We would like to end up having:

+---+----+----+
|id |col |col |
+---+----+----+
|A  |a   |null|
|A  |c   |null|
|A  |null|1   |
|A  |null|5   |
|B  |a   |null|
|B  |b   |null|
|C  |null|1   |
+---+----+----+

We are thinking about several approaches:

  1. prefixing each value with a column indicator, merge all the arrays into a single one, explode it and reorganize the different values into different columns
  2. split the dataframe into several, each one with one of these array columns, explode the array column and then, concatenating the dataframes

But all of them smell like dirty, complex, error prone and inefficient workarounds.

Does anyone have an idea about how to solve this in an elegant manner?

like image 229
landoooo Avatar asked Sep 07 '21 09:09

landoooo


People also ask

Which of the following operations can be used to split an array column into an individual DataFrame row for each element in the array?

To split multiple array column data into rows pyspark provides a function called explode().

How do you explode rows in PySpark?

explode() – PySpark explode array or map column to rows PySpark function explode(e: Column) is used to explode or create array or map columns to rows. When an array is passed to this function, it creates a new default column “col1” and it contains all array elements.

How do you transpose columns to rows in PySpark DataFrame?

Spark pivot() function is used to pivot/rotate the data from one DataFrame/Dataset column into multiple columns (transform row to column) and unpivot is used to transform it back (transform columns to rows).

How do you split column values in PySpark?

The PySpark SQL provides the split() function to convert delimiter separated String to an Array (StringType to ArrayType) column on DataFrame It can be done by splitting the string column on the delimiter like space, comma, pipe, etc. and converting it into ArrayType.


3 Answers

In case both columns list_a and list_b could be empty, I would add a 4th case in the dataset

data = [
    ("A", ["a", "c"], ["1", "5"]),
    ("B", ["a", "b"], None),
    ("C", [], ["1"]),
    ("D", None, None),
]
df = spark.createDataFrame(data,["id","list_a","list_b"])

I would then split the original df in 3 (both nulls, list_a exploded and list_b exploded) and the execute a unionByName

dfnulls = df.filter(col("list_a").isNull() & col("list_b").isNull())\
    .withColumn("list_a", lit(None))\
    .withColumn("list_b", lit(None))

df1 = df\
    .withColumn("list_a", explode_outer(col("list_a")))\
    .withColumn("list_b", lit(None))\
    .filter(~col("list_a").isNull())

df2 = df\
    .withColumn("list_b", explode_outer(col("list_b")))\
    .withColumn("list_a", lit(None))\
    .filter(~col("list_b").isNull())

merged_df = df1.unionByName(df2).unionByName(dfnulls)

merged_df.show()

+---+------+------+
| id|list_a|list_b|
+---+------+------+
|  A|     a|  null|
|  A|     c|  null|
|  B|     a|  null|
|  B|     b|  null|
|  A|  null|     1|
|  A|  null|     5|
|  C|  null|     1|
|  D|  null|  null|
+---+------+------+
like image 105
ferran Avatar answered Oct 16 '22 20:10

ferran


The following approach might help you and it's based on Scala

Basically exploding the respective list columns individually and joining the datasets based on the dummy column to get the desired result.

import org.apache.spark.sql.functions.{explode_outer, col, lit, concat}


val df1 = inputDF
  .withColumn("list_a", explode_outer(col("list_a")))
  .withColumn("random_join_col", concat(col("id"), lit("1")))
  .drop("list_b")

val df2 = inputDF
  .withColumn("list_b", explode_outer(col("list_b")))
  .withColumn("random_join_col", concat(col("id"), lit("2")))
  .drop("list_a")


val finalDF = df1.join(df2, Seq("id", "random_join_col"), "full_outer").drop("random_join_col")

// Drop rows, if it got null value on both the list columns
finalDF.na.drop(how = "all", Seq("list_a","list_b")).orderBy("id").show(false)
like image 1
Sivakumar Avatar answered Oct 16 '22 21:10

Sivakumar


Try this dynamic solution.

Input:

data = [
    ("A", ["a", "c"], ["1", "5"]),
    ("B", ["a", "b"], None),
    ("C", [], ["1"]),
]
df=spark.createDataFrame(data,["id","list_a","list_b"])
df.show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A  |[a, c]|[1, 5]|
|B  |[a, b]|null  |
|C  |[]    |[1]   |
+---+------+------+

Lets create an array of Dataframes for each of the array columns in df. Initialize first with empty Dataframe and then override it in the for loop. For each column, explode it and for all other columns, change the datatype to string with NULL.

from pyspark.sql.types import *
array_cols=df.columns[1:]  #just ignoring the ID column
c=0
dfarr=[spark.createDataFrame([],schema=StructType()) for i in array_cols ]
for i in array_cols:
    dfarr[c]=df.withColumn(i,explode(col(i)))
    for j in array_cols:
        if(i!=j):
            dfarr[c]=dfarr[c].withColumn(j,expr(" cast(null as string) "))
    c=c+1

Now, dfarr is an array of dataframes with the schema like

dfarr[0].printSchema()
root
 |-- id: string (nullable = true)
 |-- list_a: string (nullable = true)
 |-- list_b: string (nullable = true)

dfarr[1].show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A  |null  |1     |
|A  |null  |5     |
|C  |null  |1     |
+---+------+------+

The datatypes in dfarr is all similar now, so just do a union of all them. For this we need the reduce function from functools

from functools import reduce  
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionByName, dfs) 

Applying to our dfarr

combo=unionAll(*dfarr)

combo.show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A  |a     |null  |
|A  |c     |null  |
|B  |a     |null  |
|B  |b     |null  |
|A  |null  |1     |
|A  |null  |5     |
|C  |null  |1     |
+---+------+------+
like image 1
stack0114106 Avatar answered Oct 16 '22 21:10

stack0114106