Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to limit functions.collect_set in Spark SQL?

I'm dealing with a column of numbers in a large spark DataFrame, and I would like to create a new column that stores an aggregated list of unique numbers that appear in that column.

Basically exactly what functions.collect_set does. However, i only need up to 1000 elements in the aggregated list. Is there any way to pass that parameter somehow to functions.collect_set(), or any other way to get only up to 1000 elements in the aggregated list, without using a UDAF?

Since the column is so large, I'd like to avoid collecting all elements and trimming the list afterwards.

Thanks!

like image 629
user1500142 Avatar asked Aug 02 '16 21:08

user1500142


People also ask

How can I show more than 20 rows in Spark?

By default Spark with Scala, Java, or with Python (PySpark), fetches only 20 rows from DataFrame show() but not all rows and the column value is truncated to 20 characters, In order to fetch/display more than 20 rows and column full value from Spark/PySpark DataFrame, you need to pass arguments to the show() method.

How do I get the length of an array in Spark SQL?

If you are using Spark SQL, you can also use size() function that returns the size of an array or map type columns.

Can we use minus in Spark SQL?

Spark SQL supports three types of set operators: EXCEPT or MINUS.


1 Answers

Spark 2.4

As pointed out in a comment, Spark 2.4.0 comes with slice standard function which can do this sort of thing.

val usage = sql("describe function slice").as[String].collect()(2)
scala> println(usage)
Usage: slice(x, start, length) - Subsets array x starting from index start (array indices start at 1, or starting from the end if start is negative) with the specified length.

That gives the following query:

val q = input
  .groupBy('key)
  .agg(collect_set('id) as "collect")
  .withColumn("three_only", slice('collect, 1, 3))
scala> q.show(truncate = false)
+---+--------------------------------------+------------+
|key|collect                               |three_only  |
+---+--------------------------------------+------------+
|0  |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|[0, 15, 30] |
|1  |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|[1, 16, 31] |
|3  |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|[33, 48, 13]|
|2  |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|[12, 27, 37]|
|4  |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|[9, 19, 34] |
+---+--------------------------------------+------------+

Before Spark 2.4

I'd use a UDF that would do what you want after collect_set (or collect_list) or a much harder UDAF.

Given more experience with UDFs, I'd go with that first. Even though UDFs are not optimized, for this use case it's fine.

val limitUDF = udf { (nums: Seq[Long], limit: Int) => nums.take(limit) }
val sample = spark.range(50).withColumn("key", $"id" % 5)

scala> sample.groupBy("key").agg(collect_set("id") as "all").show(false)
+---+--------------------------------------+
|key|all                                   |
+---+--------------------------------------+
|0  |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|
|1  |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|
|3  |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|
|2  |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|
|4  |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|
+---+--------------------------------------+

scala> sample.
  groupBy("key").
  agg(collect_set("id") as "all").
  withColumn("limit(3)", limitUDF($"all", lit(3))).
  show(false)
+---+--------------------------------------+------------+
|key|all                                   |limit(3)    |
+---+--------------------------------------+------------+
|0  |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|[0, 15, 30] |
|1  |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|[1, 16, 31] |
|3  |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|[33, 48, 13]|
|2  |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|[12, 27, 37]|
|4  |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|[9, 19, 34] |
+---+--------------------------------------+------------+

See functions object (for udf function's docs).

like image 134
Jacek Laskowski Avatar answered Sep 17 '22 15:09

Jacek Laskowski