Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark GroupBy agg collect_list multiple columns

I have a question similar to this but the number of columns to be operated by collect_list is given by a name list. For example:

scala> w.show
+---+-----+----+-----+
|iid|event|date|place|
+---+-----+----+-----+
|  A|   D1|  T0|   P1|
|  A|   D0|  T1|   P2|
|  B|   Y1|  T0|   P3|
|  B|   Y2|  T2|   P3|
|  C|   H1|  T0|   P5|
|  C|   H0|  T9|   P5|
|  B|   Y0|  T1|   P2|
|  B|   H1|  T3|   P6|
|  D|   H1|  T2|   P4|
+---+-----+----+-----+


scala> val combList = List("event", "date", "place")
combList: List[String] = List(event, date, place)

scala> val v = w.groupBy("iid").agg(collect_list(combList(0)), collect_list(combList(1)), collect_list(combList(2)))
v: org.apache.spark.sql.DataFrame = [iid: string, collect_list(event): array<string> ... 2 more fields]

scala> v.show
+---+-------------------+------------------+-------------------+
|iid|collect_list(event)|collect_list(date)|collect_list(place)|
+---+-------------------+------------------+-------------------+
|  B|   [Y1, Y2, Y0, H1]|  [T0, T2, T1, T3]|   [P3, P3, P2, P6]|
|  D|               [H1]|              [T2]|               [P4]|
|  C|           [H1, H0]|          [T0, T9]|           [P5, P5]|
|  A|           [D1, D0]|          [T0, T1]|           [P1, P2]|
+---+-------------------+------------------+-------------------+

Is there any way I can apply collect_list to multiple columns inside agg without knowing the number of elements in the combList prior?

like image 561
Jonathan Avatar asked Feb 13 '18 04:02

Jonathan


People also ask

How do you do groupBy on multiple columns in Pyspark?

Grouping on Multiple Columns in PySpark can be performed by passing two or more columns to the groupBy() method, this returns a pyspark. sql. GroupedData object which contains agg(), sum(), count(), min(), max(), avg() e.t.c to perform aggregations.

How do I select multiple columns in Spark?

You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.

What does collect_list do in Spark?

The Spark function collect_list() is used to aggregate the values into an ArrayType typically after group by and window partition.

How do you flatten an array in Pyspark?

If you want to flatten the arrays, use flatten function which converts array of array columns to a single array on DataFrame.


1 Answers

You can use collect_list(struct(col1, col2)) AS elements.

Example:

df.select("cd_issuer", "cd_doc", "cd_item", "nm_item").printSchema
val outputDf = spark.sql(s"SELECT cd_issuer, cd_doc, collect_list(struct(cd_item, nm_item)) AS item FROM teste GROUP BY cd_issuer, cd_doc")
outputDf.printSchema

df
 |-- cd_issuer: string (nullable = true)
 |-- cd_doc: string (nullable = true)
 |-- cd_item: string (nullable = true)
 |-- nm_item: string (nullable = true)

outputDf
|-- cd_issuer: string (nullable = true)
|-- cd_doc: string (nullable = true)
|-- item: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- cd_item: string (nullable = true)
|    |    |-- nm_item: string (nullable = true)
like image 77
Rodrigo Fritsch Avatar answered Sep 16 '22 11:09

Rodrigo Fritsch