I have a question similar to this but the number of columns to be operated by collect_list is given by a name list. For example:
scala> w.show
+---+-----+----+-----+
|iid|event|date|place|
+---+-----+----+-----+
| A| D1| T0| P1|
| A| D0| T1| P2|
| B| Y1| T0| P3|
| B| Y2| T2| P3|
| C| H1| T0| P5|
| C| H0| T9| P5|
| B| Y0| T1| P2|
| B| H1| T3| P6|
| D| H1| T2| P4|
+---+-----+----+-----+
scala> val combList = List("event", "date", "place")
combList: List[String] = List(event, date, place)
scala> val v = w.groupBy("iid").agg(collect_list(combList(0)), collect_list(combList(1)), collect_list(combList(2)))
v: org.apache.spark.sql.DataFrame = [iid: string, collect_list(event): array<string> ... 2 more fields]
scala> v.show
+---+-------------------+------------------+-------------------+
|iid|collect_list(event)|collect_list(date)|collect_list(place)|
+---+-------------------+------------------+-------------------+
| B| [Y1, Y2, Y0, H1]| [T0, T2, T1, T3]| [P3, P3, P2, P6]|
| D| [H1]| [T2]| [P4]|
| C| [H1, H0]| [T0, T9]| [P5, P5]|
| A| [D1, D0]| [T0, T1]| [P1, P2]|
+---+-------------------+------------------+-------------------+
Is there any way I can apply collect_list to multiple columns inside agg without knowing the number of elements in the combList prior?
Grouping on Multiple Columns in PySpark can be performed by passing two or more columns to the groupBy() method, this returns a pyspark. sql. GroupedData object which contains agg(), sum(), count(), min(), max(), avg() e.t.c to perform aggregations.
You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.
The Spark function collect_list() is used to aggregate the values into an ArrayType typically after group by and window partition.
If you want to flatten the arrays, use flatten function which converts array of array columns to a single array on DataFrame.
You can use collect_list(struct(col1, col2)) AS elements.
Example:
df.select("cd_issuer", "cd_doc", "cd_item", "nm_item").printSchema
val outputDf = spark.sql(s"SELECT cd_issuer, cd_doc, collect_list(struct(cd_item, nm_item)) AS item FROM teste GROUP BY cd_issuer, cd_doc")
outputDf.printSchema
df
|-- cd_issuer: string (nullable = true)
|-- cd_doc: string (nullable = true)
|-- cd_item: string (nullable = true)
|-- nm_item: string (nullable = true)
outputDf
|-- cd_issuer: string (nullable = true)
|-- cd_doc: string (nullable = true)
|-- item: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- cd_item: string (nullable = true)
| | |-- nm_item: string (nullable = true)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With