I am trying to compare record of current and previous row in the below DataFrame
. I want to calculate the Amount column.
scala> val dataset = sc.parallelize(Seq((1, 123, 50), (2, 456, 30), (3, 456, 70), (4, 789, 80))).toDF("SL_NO","ID","AMOUNT")
scala> dataset.show
+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
| 1|123| 50|
| 2|456| 30|
| 3|456| 70|
| 4|789| 80|
+-----+---+------+
Calculation Logic:
Same logic need to follow for the other rows also.
Expected Output:
+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
| 1|123| 50|
| 2|456| 30|
| 3|456| 30|
| 4|789| 80|
+-----+---+------+
Please help.
agg. (Java-specific) Compute aggregates by specifying a map from column name to aggregate methods. The resulting DataFrame will also contain the grouping columns. The available aggregate methods are avg , max , min , sum , count .
Spark SQL supports three types of set operators: EXCEPT or MINUS.
The distinct() methodReturns a new DataFrame containing the distinct rows in this DataFrame . Now if you need to consider only a subset of the columns when dropping duplicates, then you first have to make a column selection before calling distinct() as shown below.
You could use lag
with when.otherwise
, here is a demonstration:
import org.apache.spark.sql.expressions.Window
val w = Window.orderBy($"SL_NO")
dataset.withColumn("AMOUNT",
when($"ID" === lag($"ID", 1).over(w), lag($"AMOUNT", 1).over(w)).otherwise($"AMOUNT")
).show
+-----+---+------+
|SL_NO| ID|AMOUNT|
+-----+---+------+
| 1|123| 50|
| 2|456| 30|
| 3|456| 30|
| 4|789| 80|
+-----+---+------+
Note: since this example doesn't use any partition, it could have performance problem, in your real data, it would be helpful if your problem can be partitioned by some variables, may be Window.orderBy($"SL_NO").partitionBy($"ID")
depending on your actual problem and whether IDs are sorted together.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With