Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use collect_set and collect_list functions in windowed aggregation in Spark 1.6?

In Spark 1.6.0 / Scala, is there an opportunity to get collect_list("colC") or collect_set("colC").over(Window.partitionBy("colA").orderBy("colB")?

like image 692
Dzmitry Haikov Avatar asked Jul 16 '17 17:07

Dzmitry Haikov


People also ask

What does collect_list do in Spark?

The Spark function collect_list() is used to aggregate the values into an ArrayType typically after group by and window partition.

How does window function work in Spark?

Spark Window functions are used to calculate results such as the rank, row number e.t.c over a range of input rows and these are available to you by importing org. apache. spark. sql.

Does collect_list maintain order?

Does it mean collect_list also maintains the order? In your code, you sort the entire dataset before collect_list() so yes.

Does Spark SQL support window functions?

Spark SQL supports three kinds of window functions: ranking functions. analytic functions. aggregate functions.


1 Answers

Given that you have dataframe as

+----+----+----+
|colA|colB|colC|
+----+----+----+
|1   |1   |23  |
|1   |2   |63  |
|1   |3   |31  |
|2   |1   |32  |
|2   |2   |56  |
+----+----+----+

You can Window functions by doing the following

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
df.withColumn("colD", collect_list("colC").over(Window.partitionBy("colA").orderBy("colB"))).show(false)

Result:

+----+----+----+------------+
|colA|colB|colC|colD        |
+----+----+----+------------+
|1   |1   |23  |[23]        |
|1   |2   |63  |[23, 63]    |
|1   |3   |31  |[23, 63, 31]|
|2   |1   |32  |[32]        |
|2   |2   |56  |[32, 56]    |
+----+----+----+------------+

Similar is the result for collect_set as well. But the order of elements in the final set will not be in order as with collect_list

df.withColumn("colD", collect_set("colC").over(Window.partitionBy("colA").orderBy("colB"))).show(false)
+----+----+----+------------+
|colA|colB|colC|colD        |
+----+----+----+------------+
|1   |1   |23  |[23]        |
|1   |2   |63  |[63, 23]    |
|1   |3   |31  |[63, 31, 23]|
|2   |1   |32  |[32]        |
|2   |2   |56  |[56, 32]    |
+----+----+----+------------+

If you remove orderBy as below

df.withColumn("colD", collect_list("colC").over(Window.partitionBy("colA"))).show(false)

result would be

+----+----+----+------------+
|colA|colB|colC|colD        |
+----+----+----+------------+
|1   |1   |23  |[23, 63, 31]|
|1   |2   |63  |[23, 63, 31]|
|1   |3   |31  |[23, 63, 31]|
|2   |1   |32  |[32, 56]    |
|2   |2   |56  |[32, 56]    |
+----+----+----+------------+

I hope the answer is helpful

like image 88
Ramesh Maharjan Avatar answered Sep 21 '22 16:09

Ramesh Maharjan