Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark - Convert column of Lists to Rows

I have a pyspark dataframe. I have to do a group by and then aggregate certain columns into a list so that I can apply a UDF on the data frame.

As an example, I have created a dataframe and then grouped by person.

df = spark.createDataFrame(a, ["Person", "Amount","Budget", "Date"])
df = df.groupby("Person").agg(F.collect_list(F.struct("Amount", "Budget", "Date")).alias("data"))
df.show(truncate=False)
+------+----------------------------------------------------------------------------+
|Person|data                                                                        |
+------+----------------------------------------------------------------------------+
|Bob   |[[85.8,Food,2017-09-13], [7.8,Household,2017-09-13], [6.52,Food,2017-06-13]]|
+------+----------------------------------------------------------------------------+ 

I have left out the UDF but the resulting data frame from the UDF is below.

+------+--------------------------------------------------------------+
|Person|res                                                           |
+------+--------------------------------------------------------------+
|Bob   |[[562,Food,June,1], [380,Household,Sept,4], [880,Food,Sept,2]]|
+------+--------------------------------------------------------------+

I need to convert the resulting dataframe into rows where each element in list is a new row with a new column. This can be seen below.

+------+------------------------------+
|Person|Amount|Budget   |Month|Cluster|
+------+------------------------------+
|Bob   |562   |Food     |June |1      |
|Bob   |380   |Household|Sept |4      |
|Bob   |880   |Food     |Sept |2      |
+------+------------------------------+
like image 747
Bryce Ramgovind Avatar asked Feb 16 '18 07:02

Bryce Ramgovind


People also ask

How do you explode columns in PySpark?

1. explode() – PySpark explode array or map column to rows. PySpark function explode(e: Column) is used to explode or create array or map columns to rows. When an array is passed to this function, it creates a new default column “col1” and it contains all array elements.

How to create array or map columns to rows in pyspark?

PySpark function explode (e: Column) is used to explode or create array or map columns to rows. When an array is passed to this function, it creates a new default column “col1” and it contains all array elements. When a map is passed, it creates two new columns one for key and one for value and each element in map split into the rows.

How to collect data from a pyspark Dataframe?

Method 1: Using flatMap () 1 dataframe is the pyspark dataframe 2 Column_Name is the column to be converted into the list 3 flatMap () is the method available in rdd which takes a lambda expression as a parameter and converts the column into... 4 collect () is used to collect the data in the columns More ...

How to convert a column to a list in Python?

Column_Name is the column to be converted into the list map () is the method available in rdd which takes a lambda expression as a parameter and converts the column into list collect () is used to collect the data in the columns Example: Python code to convert pyspark dataframe column to list using the map function.

How to explode an array in pyspark?

1 explode – PySpark explode array or map column to rows. PySpark function explode (e: Column) is used to explode or create array or map columns to rows. ... 2 explode_outer – Create rows for each element in an array or map. ... 3 posexplode – explode array or map elements to rows. ... 4 posexplode_outer – explode array or map columns to rows. ...


1 Answers

You can use explode and getItem as follows:

# starting from this form:
+------+--------------------------------------------------------------
|Person|res                                                          |
+------+--------------------------------------------------------------+
|Bob   |[[562,Food,June,1], [380,Household,Sept,4], [880,Food,Sept,2]]|
+------+--------------------------------------------------------------+
import pyspark.sql.functions as F

# explode res to have one row for each item in res
exploded_df = df.select("*", F.explode("res").alias("exploded_data"))
exploded_df.show(truncate=False)

# then use getItem to create separate columns
exploded_df = exploded_df.withColumn(
            "Amount",
            F.col("exploded_data").getItem("Amount") # either get by name or by index e.g. getItem(0) etc
        )

exploded_df = exploded_df.withColumn(
            "Budget",
            F.col("exploded_data").getItem("Budget")
        )

exploded_df = exploded_df.withColumn(
            "Month",
            F.col("exploded_data").getItem("Month")
        )

exploded_df = exploded_df.withColumn(
            "Cluster",
            F.col("exploded_data").getItem("Cluster")
        )

exploded_df.select("Person", "Amount", "Budget", "Month", "Cluster").show(10, False)

+------+------------------------------+
|Person|Amount|Budget   |Month|Cluster|
+------+------------------------------+
|Bob   |562   |Food     |June |1      |
|Bob   |380   |Household|Sept |4      |
|Bob   |880   |Food     |Sept |2      |
+------+------------------------------+

You can then drop unnecessary columns. Hope this helps, good luck!

like image 57
mkaran Avatar answered Oct 15 '22 11:10

mkaran