Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Spark Cumulative Sum by Group Using DataFrame

How do I compute the cumulative sum per group specifically using the DataFrame abstraction; and in PySpark?

With an example dataset as follows:

df = sqlContext.createDataFrame( [(1,2,"a"),(3,2,"a"),(1,3,"b"),(2,2,"a"),(2,3,"b")], 
                                 ["time", "value", "class"] )

+----+-----+-----+
|time|value|class|
+----+-----+-----+
|   1|    2|    a|
|   3|    2|    a|
|   1|    3|    b|
|   2|    2|    a|
|   2|    3|    b|
+----+-----+-----+

I would like to add a cumulative sum column of value for each class grouping over the (ordered) time variable.

like image 469
mr kw Avatar asked Aug 29 '17 18:08

mr kw


3 Answers

This can be done using a combination of a window function and the Window.unboundedPreceding value in the window's range as follows:

from pyspark.sql import Window from pyspark.sql import functions as F  windowval = (Window.partitionBy('class').orderBy('time')              .rangeBetween(Window.unboundedPreceding, 0)) df_w_cumsum = df.withColumn('cum_sum', F.sum('value').over(windowval)) df_w_cumsum.show() 
+----+-----+-----+-------+ |time|value|class|cum_sum| +----+-----+-----+-------+ |   1|    3|    b|      3| |   2|    3|    b|      6| |   1|    2|    a|      2| |   2|    2|    a|      4| |   3|    2|    a|      6| +----+-----+-----+-------+ 
like image 184
mr kw Avatar answered Sep 25 '22 23:09

mr kw


I have tried this way and it worked for me.

from pyspark.sql import Window  from pyspark.sql import functions as f  import sys  cum_sum = DF.withColumn('cumsum', f.sum('value').over(Window.partitionBy('class').orderBy('time').rowsBetween(-sys.maxsize, 0))) cum_sum.show() 
like image 35
Anubhav Raj Avatar answered Sep 24 '22 23:09

Anubhav Raj


To make an update from previous answers. The correct and precise way to do is :

from pyspark.sql import Window
from pyspark.sql import functions as F

windowval = (Window.partitionBy('class').orderBy('time')
             .rowsBetween(Window.unboundedPreceding, 0))
df_w_cumsum = df.withColumn('cum_sum', F.sum('value').over(windowval))
df_w_cumsum.show()
like image 21
vegetarianCoder Avatar answered Sep 22 '22 23:09

vegetarianCoder