I have a Spark Dataframe read from a csv file in this way:
df = ss.read \
.format("csv") \
.option("delimiter", ";") \
.option("header", "false") \
.option("inferSchema", "true") \
.option("escape", "\"") \
.option("multiline", "true") \
.option("wholeFile", "true") \
.load(file_path)
The Dataframe is like this one:
|cod_cli|article_name|rank|
|123 |art_1 |1 |
|123 |art_2 |2 |
|123 |art_3 |3 |
|456 |art_4 |1 |
|456 |art_5 |2 |
|456 |art_6 |3 |
I want to group the element by the column cod_cli and create multiple columns, one for each product in the grouped set, and as a value a dictionary key-value with the key as the column name and as a value the value related to that column name, like this:
|cod_cli|Product 1 |Product 2 |Product 3 |
|123 |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456 |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|
The dictionary value can be both a string (better) or a map. I tried in this way:
df = df \
.groupBy(F.col("cod_cli")) \
.agg(F.collect_list(F.array("cod_art","rank")))
But in this way, I'm creating a column with an array column with all the grouped elements.
Please anyone can help me?
Thank you
UPDATE
The solution proposed is this one:
df = df.withColumn(
"Product",
F.to_json(
F.struct(F.col("cod_art"), F.col("rank"))
)
)
In this way, I create a column "Product" with the json string desired, example {cod_art : art_1, rank : 1}
.
Then:
df = df \
.groupBy(F.col("cod_cli")) \
.pivot("rank") \
.agg(F.first("Product"))
In this way, I can create one column for each product, grouped by the cod_cli attribute, and handle situations in which I have more than 3 products as columns:
|cod_cli|1 |2 |3
|123 |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456 |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|
You can do this without pivot
(expensive operation), using collect_list
of struct
, then to_json
with create_map
.
from pyspark.sql import functions as F
df\
.groupBy("cod_cli").agg(F.collect_list(F.struct("article_name","rank"))\
.alias("arr"))\
.select("cod_cli", *(F.to_json(F.create_map(F.lit("cod_art"),(F.col("arr.article_name")[x]),F.lit("rank"),(F.col("arr.rank")[x])))\
.alias("Product{}".format(x+1)) for x in range(3)))\
.show(truncate=False)
#+-------+------------------------------+------------------------------+------------------------------+
#|cod_cli|Product1 |Product2 |Product3 |
#+-------+------------------------------+------------------------------+------------------------------+
#|123 |{"cod_art":"art_1","rank":"1"}|{"cod_art":"art_2","rank":"2"}|{"cod_art":"art_3","rank":"3"}|
#|456 |{"cod_art":"art_4","rank":"1"}|{"cod_art":"art_5","rank":"2"}|{"cod_art":"art_6","rank":"3"}|
#+-------+------------------------------+------------------------------+------------------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With