Input:
val customers = sc.parallelize(List(("Alice", "2016-05-01", 50.00,4),
("Alice", "2016-05-03", 45.00,2),
("Alice", "2016-05-04", 55.00,4),
("Bob", "2016-05-01", 25.00,6),
("Bob", "2016-05-04", 29.00,7),
("Bob", "2016-05-06", 27.00,10))).
toDF("name", "date", "amountSpent","NumItems")
Procedure:
// Import the window functions.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
// Create a window spec.
val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
In this window spec, the data is partitioned by customer. Each customer’s data is ordered by date. And, the window frame is defined as starting from -1 (one row before the current row) and ending at 1 (one row after the current row), for a total of 3 rows in the sliding window. The problem is to take window-based summation for a list of columns. In this case, they're "amountSpent","NumItems". But the problem can have up to hundreds of columns.
Below is the solution for doing window-based summation for each column. However, how to perform the summation more effectively? because we don't need to do find slided-window rows every time for each column.
// Calculate the sum of spent
customers.withColumn("sumSpent",sum(customers("amountSpent")).over(wSpec1)).show()
+-----+----------+-----------+--------+--------+
| name| date|amountSpent|NumItems|sumSpent|
+-----+----------+-----------+--------+--------+
|Alice|2016-05-01| 50.0| 4| 95.0|
|Alice|2016-05-03| 45.0| 2| 150.0|
|Alice|2016-05-04| 55.0| 4| 100.0|
| Bob|2016-05-01| 25.0| 6| 54.0|
| Bob|2016-05-04| 29.0| 7| 81.0|
| Bob|2016-05-06| 27.0| 10| 56.0|
+-----+----------+-----------+--------+--------+
// Calculate the sum of items
customers.withColumn( "sumItems",
sum(customers("NumItems")).over(wSpec1) ).show()
+-----+----------+-----------+--------+--------+
| name| date|amountSpent|NumItems|sumItems|
+-----+----------+-----------+--------+--------+
|Alice|2016-05-01| 50.0| 4| 6|
|Alice|2016-05-03| 45.0| 2| 10|
|Alice|2016-05-04| 55.0| 4| 6|
| Bob|2016-05-01| 25.0| 6| 13|
| Bob|2016-05-04| 29.0| 7| 23|
| Bob|2016-05-06| 27.0| 10| 17|
+-----+----------+-----------+--------+--------+
Currently, I guess, its not possible to update multiple columns using Window function. You can act as if its happening at the same time as below
val customers = sc.parallelize(List(("Alice", "2016-05-01", 50.00,4),
("Alice", "2016-05-03", 45.00,2),
("Alice", "2016-05-04", 55.00,4),
("Bob", "2016-05-01", 25.00,6),
("Bob", "2016-05-04", 29.00,7),
("Bob", "2016-05-06", 27.00,10))).
toDF("name", "date", "amountSpent","NumItems")
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
// Create a window spec.
val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
var tempdf = customers
val colNames = List("amountSpent", "NumItems")
for(column <- colNames){
tempdf = tempdf.withColumn(column+"Sum", sum(tempdf(column)).over(wSpec1))
}
tempdf.show(false)
You should have output as
+-----+----------+-----------+--------+--------------+-----------+
|name |date |amountSpent|NumItems|amountSpentSum|NumItemsSum|
+-----+----------+-----------+--------+--------------+-----------+
|Bob |2016-05-01|25.0 |6 |54.0 |13 |
|Bob |2016-05-04|29.0 |7 |81.0 |23 |
|Bob |2016-05-06|27.0 |10 |56.0 |17 |
|Alice|2016-05-01|50.0 |4 |95.0 |6 |
|Alice|2016-05-03|45.0 |2 |150.0 |10 |
|Alice|2016-05-04|55.0 |4 |100.0 |6 |
+-----+----------+-----------+--------+--------------+-----------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With