Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cumulative sum in Spark

I want to do cumulative sum in Spark. Here is the register table (input):

+---------------+-------------------+----+----+----+
|     product_id|          date_time| ack|val1|val2|
+---------------+-------------------+----+----+----+
|4008607333T.upf|2017-12-13:02:27:01|3-46|  53|  52|
|4008607333T.upf|2017-12-13:02:27:03|3-47|  53|  52|
|4008607333T.upf|2017-12-13:02:27:08|3-46|  53|  52|
|4008607333T.upf|2017-12-13:02:28:01|3-47|  53|  52|
|4008607333T.upf|2017-12-13:02:28:07|3-46|  15|   1|
+---------------+-------------------+----+----+----+

Hive query:

select *, SUM(val1) over ( Partition by product_id, ack order by date_time rows between unbounded preceding and current row ) val1_sum, SUM(val2) over ( Partition by product_id, ack order by date_time rows between unbounded preceding and current row ) val2_sum from test

Output:

+---------------+-------------------+----+----+----+-------+--------+
|     product_id|          date_time| ack|val1|val2|val_sum|val2_sum|
+---------------+-------------------+----+----+----+-------+--------+
|4008607333T.upf|2017-12-13:02:27:01|3-46|  53|  52|     53|      52|
|4008607333T.upf|2017-12-13:02:27:08|3-46|  53|  52|    106|     104|
|4008607333T.upf|2017-12-13:02:28:07|3-46|  15|   1|    121|     105|
|4008607333T.upf|2017-12-13:02:27:03|3-47|  53|  52|     53|      52|
|4008607333T.upf|2017-12-13:02:28:01|3-47|  53|  52|    106|     104|
+---------------+-------------------+----+----+----+-------+--------+

Using Spark logic, I am getting same above output:

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('product_id, 'ack).orderBy('date_time)
import org.apache.spark.sql.functions._

val newDf = inputDF.withColumn("val_sum", sum('val1) over w).withColumn("val2_sum", sum('val2) over w)
newDf.show

However, when I try this logic on spark cluster val_sum value will be half of the cumulative sum and something time it is different. I don't know why it is happening on spark cluster. Is it due to partitions?

How I can do cumulative sum of a column on a spark cluster?

like image 966
lucy Avatar asked Dec 18 '17 18:12

lucy


People also ask

How do you find the cumulative sum of a column in PySpark?

cumsum() cumsum() will return the cumulative sum in each column. It can be applied to the entire pyspark pandas dataframe or a single column.

What is cumulative summing?

Cumulative sums, or running totals, are used to display the total sum of data as it grows with time (or any other series or progression). This lets you view the total contribution so far of a given measure against time.

What is cumulative sum in SQL?

What is mean by cumulative sum in SQL ? The cumulative sum is nothing but running total which will give the display of total sum of data which will raise in series or progression. Example – 10,20 = 30, 10,20,30=60.


1 Answers

To get the cumulative sum using the DataFrame API you should use the rowsBetween window method. In Spark 2.1 and newer create the window as follows:

val w = Window.partitionBy($"product_id", $"ack")
  .orderBy($"date_time")
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

This will tell Spark to use the values from the beginning of the partition until the current row. Using older versions of Spark, use rowsBetween(Long.MinValue, 0) for the same effect.

To use the window, use the same method as before:

val newDf = inputDF.withColumn("val_sum", sum($"val1").over(w))
  .withColumn("val2_sum", sum($"val2").over(w))
like image 57
Shaido Avatar answered Sep 21 '22 15:09

Shaido