I'd like to compare two consecutive rows i
with i-1
of col2
(sorted by col1
).
If item_i
of the i
-th row and the item_[i-1]_row
are different, I'd like to increment the count of item_[i-1]
by 1.
+--------------+
| col1 col2 |
+--------------+
| row_1 item_1 |
| row_2 item_1 |
| row_3 item_2 |
| row_4 item_1 |
| row_5 item_2 |
| row_6 item_1 |
+--------------+
In the above example, if we scan two rows at a time downwards, we see that row_2
and row_3
are different therefore we add one to item_1. Next, we see that row_3
is different from row_4
, then add one to item_2
. Continue until we end up with:
+-------------+
| col2 col3 |
+-------------+
| item_1 2 |
| item_2 2 |
+-------------+
You can use a combination of a window function and an aggregate to do this. The window function is used to get the next value of col2
(using col1
for ordering). The aggregate then counts the times we encounter a differences. This is implemented in the code below:
val data = Seq(
("row_1", "item_1"),
("row_2", "item_1"),
("row_3", "item_2"),
("row_4", "item_1"),
("row_5", "item_2"),
("row_6", "item_1")).toDF("col1", "col2")
import org.apache.spark.sql.expressions.Window
val q = data.
withColumn("col2_next",
coalesce(lead($"col2", 1) over Window.orderBy($"col1"), $"col2")).
groupBy($"col2").
agg(sum($"col2" =!= $"col2_next" cast "int") as "col3")
scala> q.show
17/08/22 10:15:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+----+
| col2|col3|
+------+----+
|item_1| 2|
|item_2| 2|
+------+----+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With