Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to compare multiple rows?

I'd like to compare two consecutive rows i with i-1 of col2 (sorted by col1).

If item_i of the i-th row and the item_[i-1]_row are different, I'd like to increment the count of item_[i-1] by 1.

+--------------+
| col1 col2    |
+--------------+
| row_1 item_1 |
| row_2 item_1 |
| row_3 item_2 |
| row_4 item_1 |
| row_5 item_2 |
| row_6 item_1 |
+--------------+

In the above example, if we scan two rows at a time downwards, we see that row_2 and row_3 are different therefore we add one to item_1. Next, we see that row_3 is different from row_4, then add one to item_2. Continue until we end up with:

+-------------+
|  col2  col3 |
+-------------+
|  item_1  2  |
|  item_2  2  |
+-------------+
like image 983
quickinsights Avatar asked Aug 21 '15 19:08

quickinsights


1 Answers

You can use a combination of a window function and an aggregate to do this. The window function is used to get the next value of col2 (using col1 for ordering). The aggregate then counts the times we encounter a differences. This is implemented in the code below:

val data = Seq(
  ("row_1", "item_1"),
  ("row_2", "item_1"),
  ("row_3", "item_2"),
  ("row_4", "item_1"),
  ("row_5", "item_2"),
  ("row_6", "item_1")).toDF("col1", "col2")

import org.apache.spark.sql.expressions.Window
val q = data.
  withColumn("col2_next",
    coalesce(lead($"col2", 1) over Window.orderBy($"col1"), $"col2")).
  groupBy($"col2").
  agg(sum($"col2" =!= $"col2_next" cast "int") as "col3")

scala> q.show
17/08/22 10:15:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+----+
|  col2|col3|
+------+----+
|item_1|   2|
|item_2|   2|
+------+----+
like image 75
Herman Avatar answered Sep 20 '22 12:09

Herman