Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Compare Value of Current and Previous Row in Spark

I am trying to compare record of current and previous row in the below DataFrame. I want to calculate the Amount column.

scala> val dataset = sc.parallelize(Seq((1, 123, 50), (2, 456, 30), (3, 456, 70), (4, 789, 80))).toDF("SL_NO","ID","AMOUNT")

scala> dataset.show
+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
|    1|123|    50|
|    2|456|    30|
|    3|456|    70|
|    4|789|    80|
+-----+---+------+

Calculation Logic:

  1. For the row no 1, AMOUNT should be 50 from first row.
  2. For the row no 2, if ID of SL_NO - 2 and 1 is not same then need to consider AMOUNT of SL_NO - 2 (i.e - 30). Otherwise AMOUNT of SL_NO - 1 (i.e. - 50)
  3. For the row no 3, if ID of SL_NO - 3 and 2 is not same then need to consider AMOUNT of SL_NO - 3 (i.e - 70). Otherwise AMOUNT of SL_NO - 2 (i.e. - 30)

Same logic need to follow for the other rows also.

Expected Output:

+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
|    1|123|    50|
|    2|456|    30|
|    3|456|    30|
|    4|789|    80|
+-----+---+------+

Please help.

like image 878
Avijit Avatar asked Sep 13 '17 12:09

Avijit


People also ask

What is AGG in Spark?

agg. (Java-specific) Compute aggregates by specifying a map from column name to aggregate methods. The resulting DataFrame will also contain the grouping columns. The available aggregate methods are avg , max , min , sum , count .

Can we use minus in Spark SQL?

Spark SQL supports three types of set operators: EXCEPT or MINUS.

How does Spark calculate distinct values?

The distinct() methodReturns a new DataFrame containing the distinct rows in this DataFrame . Now if you need to consider only a subset of the columns when dropping duplicates, then you first have to make a column selection before calling distinct() as shown below.


1 Answers

You could use lag with when.otherwise, here is a demonstration:

import org.apache.spark.sql.expressions.Window

val w = Window.orderBy($"SL_NO")
dataset.withColumn("AMOUNT", 
    when($"ID" === lag($"ID", 1).over(w), lag($"AMOUNT", 1).over(w)).otherwise($"AMOUNT")
).show

+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
|    1|123|    50|
|    2|456|    30|
|    3|456|    30|
|    4|789|    80|
+-----+---+------+

Note: since this example doesn't use any partition, it could have performance problem, in your real data, it would be helpful if your problem can be partitioned by some variables, may be Window.orderBy($"SL_NO").partitionBy($"ID") depending on your actual problem and whether IDs are sorted together.

like image 121
Psidom Avatar answered Oct 19 '22 06:10

Psidom