I'm dealing with a column of numbers in a large spark DataFrame, and I would like to create a new column that stores an aggregated list of unique numbers that appear in that column.
Basically exactly what functions.collect_set does. However, i only need up to 1000 elements in the aggregated list. Is there any way to pass that parameter somehow to functions.collect_set(), or any other way to get only up to 1000 elements in the aggregated list, without using a UDAF?
Since the column is so large, I'd like to avoid collecting all elements and trimming the list afterwards.
Thanks!
By default Spark with Scala, Java, or with Python (PySpark), fetches only 20 rows from DataFrame show() but not all rows and the column value is truncated to 20 characters, In order to fetch/display more than 20 rows and column full value from Spark/PySpark DataFrame, you need to pass arguments to the show() method.
If you are using Spark SQL, you can also use size() function that returns the size of an array or map type columns.
Spark SQL supports three types of set operators: EXCEPT or MINUS.
As pointed out in a comment, Spark 2.4.0 comes with slice standard function which can do this sort of thing.
val usage = sql("describe function slice").as[String].collect()(2)
scala> println(usage)
Usage: slice(x, start, length) - Subsets array x starting from index start (array indices start at 1, or starting from the end if start is negative) with the specified length.
That gives the following query:
val q = input
.groupBy('key)
.agg(collect_set('id) as "collect")
.withColumn("three_only", slice('collect, 1, 3))
scala> q.show(truncate = false)
+---+--------------------------------------+------------+
|key|collect |three_only |
+---+--------------------------------------+------------+
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|[0, 15, 30] |
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|[1, 16, 31] |
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|[33, 48, 13]|
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|[12, 27, 37]|
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|[9, 19, 34] |
+---+--------------------------------------+------------+
I'd use a UDF that would do what you want after collect_set
(or collect_list
) or a much harder UDAF.
Given more experience with UDFs, I'd go with that first. Even though UDFs are not optimized, for this use case it's fine.
val limitUDF = udf { (nums: Seq[Long], limit: Int) => nums.take(limit) }
val sample = spark.range(50).withColumn("key", $"id" % 5)
scala> sample.groupBy("key").agg(collect_set("id") as "all").show(false)
+---+--------------------------------------+
|key|all |
+---+--------------------------------------+
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|
+---+--------------------------------------+
scala> sample.
groupBy("key").
agg(collect_set("id") as "all").
withColumn("limit(3)", limitUDF($"all", lit(3))).
show(false)
+---+--------------------------------------+------------+
|key|all |limit(3) |
+---+--------------------------------------+------------+
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|[0, 15, 30] |
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|[1, 16, 31] |
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|[33, 48, 13]|
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|[12, 27, 37]|
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|[9, 19, 34] |
+---+--------------------------------------+------------+
See functions object (for udf
function's docs).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With