How do I compute the cumulative sum per group specifically using the DataFrame
abstraction
; and in PySpark
?
With an example dataset as follows:
df = sqlContext.createDataFrame( [(1,2,"a"),(3,2,"a"),(1,3,"b"),(2,2,"a"),(2,3,"b")],
["time", "value", "class"] )
+----+-----+-----+
|time|value|class|
+----+-----+-----+
| 1| 2| a|
| 3| 2| a|
| 1| 3| b|
| 2| 2| a|
| 2| 3| b|
+----+-----+-----+
I would like to add a cumulative sum column of value
for each class
grouping over the (ordered) time
variable.
This can be done using a combination of a window function and the Window.unboundedPreceding value in the window's range as follows:
from pyspark.sql import Window from pyspark.sql import functions as F windowval = (Window.partitionBy('class').orderBy('time') .rangeBetween(Window.unboundedPreceding, 0)) df_w_cumsum = df.withColumn('cum_sum', F.sum('value').over(windowval)) df_w_cumsum.show()
+----+-----+-----+-------+ |time|value|class|cum_sum| +----+-----+-----+-------+ | 1| 3| b| 3| | 2| 3| b| 6| | 1| 2| a| 2| | 2| 2| a| 4| | 3| 2| a| 6| +----+-----+-----+-------+
I have tried this way and it worked for me.
from pyspark.sql import Window from pyspark.sql import functions as f import sys cum_sum = DF.withColumn('cumsum', f.sum('value').over(Window.partitionBy('class').orderBy('time').rowsBetween(-sys.maxsize, 0))) cum_sum.show()
To make an update from previous answers. The correct and precise way to do is :
from pyspark.sql import Window
from pyspark.sql import functions as F
windowval = (Window.partitionBy('class').orderBy('time')
.rowsBetween(Window.unboundedPreceding, 0))
df_w_cumsum = df.withColumn('cum_sum', F.sum('value').over(windowval))
df_w_cumsum.show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With