Given the following DataFrame:
+----+-----+---+-----+
| uid|    k|  v|count|
+----+-----+---+-----+
|   a|pref1|  b|  168|
|   a|pref3|  h|  168|
|   a|pref3|  t|   63|
|   a|pref3|  k|   84|
|   a|pref1|  e|   84|
|   a|pref2|  z|  105|
+----+-----+---+-----+
How can I get the max value from uid, k but include v?
+----+-----+---+----------+
| uid|    k|  v|max(count)|
+----+-----+---+----------+
|   a|pref1|  b|       168|
|   a|pref3|  h|       168|
|   a|pref2|  z|       105|
+----+-----+---+----------+
I can do something like this but it will drop the column "v" :
df.groupBy("uid", "k").max("count")
The GROUP BY statement is often used with aggregate functions ( COUNT() , MAX() , MIN() , SUM() , AVG() ) to group the result-set by one or more columns.
If you're working with MySQL, you can combine MAX() with the GREATEST() function to get the biggest value from two or more fields. Here's the syntax for GREATEST: GREATEST(value1,value2,...) Given two or more arguments, it returns the largest (maximum-valued) argument.
How do you get max for each group in SQL? To find the maximum value of a column, use the MAX() aggregate function; it takes a column name or an expression to find the maximum value. In our example, the subquery returns the highest number in the column grade (subquery: SELECT MAX(grade) FROM student ).
For example, if we have a matrix M that contains 2 rows and 2 columns with values 1, 2 in the first row and 3, 4 in the second row then the maximum for each of the columns in that matrix can be found by using the syntax; apply(M,2,max), hence the result will be 3, 4.
It's the perfect example for window operators (using over function) or join.
Since you've already figured out how to use windows, I focus on join exclusively.
scala> val inventory = Seq(
     |   ("a", "pref1", "b", 168),
     |   ("a", "pref3", "h", 168),
     |   ("a", "pref3", "t",  63)).toDF("uid", "k", "v", "count")
inventory: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 2 more fields]
scala> val maxCount = inventory.groupBy("uid", "k").max("count")
maxCount: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 1 more field]
scala> maxCount.show
+---+-----+----------+
|uid|    k|max(count)|
+---+-----+----------+
|  a|pref3|       168|
|  a|pref1|       168|
+---+-----+----------+
scala> val maxCount = inventory.groupBy("uid", "k").agg(max("count") as "max")
maxCount: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 1 more field]
scala> maxCount.show
+---+-----+---+
|uid|    k|max|
+---+-----+---+
|  a|pref3|168|
|  a|pref1|168|
+---+-----+---+
scala> maxCount.join(inventory, Seq("uid", "k")).where($"max" === $"count").show
+---+-----+---+---+-----+
|uid|    k|max|  v|count|
+---+-----+---+---+-----+
|  a|pref3|168|  h|  168|
|  a|pref1|168|  b|  168|
+---+-----+---+---+-----+
Here's the best solution I came up with so far:
val w = Window.partitionBy("uid","k").orderBy(col("count").desc)
df.withColumn("rank", dense_rank().over(w)).select("uid", "k","v","count").where("rank == 1").show
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With