I created a dataframe in Spark, by groupby column1 and date and calculated the amount.
val table = df1.groupBy($"column1",$"date").sum("amount")
Column1 |Date |Amount
A |1-jul |1000
A |1-june |2000
A |1-May |2000
A |1-dec |3000
A |1-Nov |2000
B |1-jul |100
B |1-june |300
B |1-May |400
B |1-dec |300
Now, I want to add new column, with difference between amount of any two dates from the table.
Use the except() to subtract or find the difference between two dataframes.
Therefore, select() method is useful when you simply need to select a subset of columns from a particular Spark DataFrame. On the other hand, selectExpr() comes in handy when you need to select particular columns while at the same time you also need to apply some sort of transformation over particular column(s).
Timestamp difference in PySpark can be calculated by using 1) unix_timestamp() to get the Time in seconds and subtract with other time to get the seconds 2) Cast TimestampType column to LongType and subtract two long values to get the difference in seconds, divide it by 60 to get the minute difference and finally ...
UNION (alternatively, UNION DISTINCT ) takes only distinct rows while UNION ALL does not remove duplicates from the result rows.
You can use Window
function if the calculation is fixed as calculating difference between previous months, or calculating between previous two months ... etc. For that you can use lag
and lead
function with Window
.
But for that you need to change the date column as below so that it can be ordered.
+-------+------+--------------+------+
|Column1|Date |Date_Converted|Amount|
+-------+------+--------------+------+
|A |1-jul |2017-07-01 |1000 |
|A |1-june|2017-06-01 |2000 |
|A |1-May |2017-05-01 |2000 |
|A |1-dec |2017-12-01 |3000 |
|A |1-Nov |2017-11-01 |2000 |
|B |1-jul |2017-07-01 |100 |
|B |1-june|2017-06-01 |300 |
|B |1-May |2017-05-01 |400 |
|B |1-dec |2017-12-01 |300 |
+-------+------+--------------+------+
You can find the difference between previous month and current month by doing
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("Column1").orderBy("Date_Converted")
import org.apache.spark.sql.functions._
df.withColumn("diff_Amt_With_Prev_Month", $"Amount" - when((lag("Amount", 1).over(windowSpec)).isNull, 0).otherwise(lag("Amount", 1).over(windowSpec)))
.show(false)
You should have
+-------+------+--------------+------+------------------------+
|Column1|Date |Date_Converted|Amount|diff_Amt_With_Prev_Month|
+-------+------+--------------+------+------------------------+
|B |1-May |2017-05-01 |400 |400.0 |
|B |1-june|2017-06-01 |300 |-100.0 |
|B |1-jul |2017-07-01 |100 |-200.0 |
|B |1-dec |2017-12-01 |300 |200.0 |
|A |1-May |2017-05-01 |2000 |2000.0 |
|A |1-june|2017-06-01 |2000 |0.0 |
|A |1-jul |2017-07-01 |1000 |-1000.0 |
|A |1-Nov |2017-11-01 |2000 |1000.0 |
|A |1-dec |2017-12-01 |3000 |1000.0 |
+-------+------+--------------+------+------------------------+
You can increase the lagging position for previous two months as
df.withColumn("diff_Amt_With_Prev_two_Month", $"Amount" - when((lag("Amount", 2).over(windowSpec)).isNull, 0).otherwise(lag("Amount", 2).over(windowSpec)))
.show(false)
which will give you
+-------+------+--------------+------+----------------------------+
|Column1|Date |Date_Converted|Amount|diff_Amt_With_Prev_two_Month|
+-------+------+--------------+------+----------------------------+
|B |1-May |2017-05-01 |400 |400.0 |
|B |1-june|2017-06-01 |300 |300.0 |
|B |1-jul |2017-07-01 |100 |-300.0 |
|B |1-dec |2017-12-01 |300 |0.0 |
|A |1-May |2017-05-01 |2000 |2000.0 |
|A |1-june|2017-06-01 |2000 |2000.0 |
|A |1-jul |2017-07-01 |1000 |-1000.0 |
|A |1-Nov |2017-11-01 |2000 |0.0 |
|A |1-dec |2017-12-01 |3000 |2000.0 |
+-------+------+--------------+------+----------------------------+
I hope the answer is helpful
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With