Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Scala: moving average for multiple columns

Input:

val customers = sc.parallelize(List(("Alice", "2016-05-01", 50.00,4),
                                ("Alice", "2016-05-03", 45.00,2),
                                ("Alice", "2016-05-04", 55.00,4),
                                ("Bob", "2016-05-01", 25.00,6),
                                ("Bob", "2016-05-04", 29.00,7),
                                ("Bob", "2016-05-06", 27.00,10))).
                           toDF("name", "date", "amountSpent","NumItems")

Procedure:

 // Import the window functions.
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions._

 // Create a window spec.
 val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)

In this window spec, the data is partitioned by customer. Each customer’s data is ordered by date. And, the window frame is defined as starting from -1 (one row before the current row) and ending at 1 (one row after the current row), for a total of 3 rows in the sliding window. The problem is to take window-based summation for a list of columns. In this case, they're "amountSpent","NumItems". But the problem can have up to hundreds of columns.

Below is the solution for doing window-based summation for each column. However, how to perform the summation more effectively? because we don't need to do find slided-window rows every time for each column.

 // Calculate the sum of spent
 customers.withColumn("sumSpent",sum(customers("amountSpent")).over(wSpec1)).show()

  +-----+----------+-----------+--------+--------+
  | name|      date|amountSpent|NumItems|sumSpent|
  +-----+----------+-----------+--------+--------+
  |Alice|2016-05-01|       50.0|       4|    95.0|
  |Alice|2016-05-03|       45.0|       2|   150.0|
  |Alice|2016-05-04|       55.0|       4|   100.0|
  |  Bob|2016-05-01|       25.0|       6|    54.0|
  |  Bob|2016-05-04|       29.0|       7|    81.0|
  |  Bob|2016-05-06|       27.0|      10|    56.0|
  +-----+----------+-----------+--------+--------+

 // Calculate the sum of items
 customers.withColumn( "sumItems",
                sum(customers("NumItems")).over(wSpec1)  ).show()

  +-----+----------+-----------+--------+--------+
  | name|      date|amountSpent|NumItems|sumItems|
  +-----+----------+-----------+--------+--------+
  |Alice|2016-05-01|       50.0|       4|       6|
  |Alice|2016-05-03|       45.0|       2|      10|
  |Alice|2016-05-04|       55.0|       4|       6|
  |  Bob|2016-05-01|       25.0|       6|      13|
  |  Bob|2016-05-04|       29.0|       7|      23|
  |  Bob|2016-05-06|       27.0|      10|      17|
  +-----+----------+-----------+--------+--------+
like image 569
HappyCoding Avatar asked Mar 09 '23 18:03

HappyCoding


1 Answers

Currently, I guess, its not possible to update multiple columns using Window function. You can act as if its happening at the same time as below

val customers = sc.parallelize(List(("Alice", "2016-05-01", 50.00,4),
  ("Alice", "2016-05-03", 45.00,2),
  ("Alice", "2016-05-04", 55.00,4),
  ("Bob", "2016-05-01", 25.00,6),
  ("Bob", "2016-05-04", 29.00,7),
  ("Bob", "2016-05-06", 27.00,10))).
  toDF("name", "date", "amountSpent","NumItems")

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

// Create a window spec.
val wSpec1 = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
var tempdf = customers
val colNames = List("amountSpent", "NumItems")
for(column <- colNames){
  tempdf = tempdf.withColumn(column+"Sum", sum(tempdf(column)).over(wSpec1))
}
tempdf.show(false)

You should have output as

+-----+----------+-----------+--------+--------------+-----------+
|name |date      |amountSpent|NumItems|amountSpentSum|NumItemsSum|
+-----+----------+-----------+--------+--------------+-----------+
|Bob  |2016-05-01|25.0       |6       |54.0          |13         |
|Bob  |2016-05-04|29.0       |7       |81.0          |23         |
|Bob  |2016-05-06|27.0       |10      |56.0          |17         |
|Alice|2016-05-01|50.0       |4       |95.0          |6          |
|Alice|2016-05-03|45.0       |2       |150.0         |10         |
|Alice|2016-05-04|55.0       |4       |100.0         |6          |
+-----+----------+-----------+--------+--------------+-----------+
like image 116
Ramesh Maharjan Avatar answered Mar 20 '23 19:03

Ramesh Maharjan