Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to calculate cumulative sum using sqlContext

I know we can use Window function in pyspark to calculate cumulative sum. But Window is only supported in HiveContext and not in SQLContext. I need to use SQLContext as HiveContext cannot be run in multi processes.

Is there any efficient way to calculate cumulative sum using SQLContext? A simple way is to load the data into the driver's memory and use numpy.cumsum, but the con is the data need to be able to fit into the memory

like image 723
Michael Avatar asked Dec 02 '22 15:12

Michael


1 Answers

Not sure if this is what you are looking for but here are two examples how to use sqlContext to calculate the cumulative sum:

First when you want to partition it by some categories:

from pyspark.sql.types import StructType, StringType, LongType
from pyspark.sql import SQLContext

rdd = sc.parallelize([
    ("Tablet", 6500), 
    ("Tablet", 5500), 
    ("Cell Phone", 6000), 
    ("Cell Phone", 6500), 
    ("Cell Phone", 5500)
    ])

schema = StructType([
    StructField("category", StringType(), False),
    StructField("revenue", LongType(), False)
    ])

df = sqlContext.createDataFrame(rdd, schema)

df.registerTempTable("test_table")

df2 = sqlContext.sql("""
SELECT
    category,
    revenue,
    sum(revenue) OVER (PARTITION BY category ORDER BY revenue) as cumsum
FROM
test_table
""")

Output:

[Row(category='Tablet', revenue=5500, cumsum=5500),
 Row(category='Tablet', revenue=6500, cumsum=12000),
 Row(category='Cell Phone', revenue=5500, cumsum=5500),
 Row(category='Cell Phone', revenue=6000, cumsum=11500),
 Row(category='Cell Phone', revenue=6500, cumsum=18000)]

Second when you only want to take the cumsum of one variable. Change df2 to this:

df2 = sqlContext.sql("""
SELECT
    category,
    revenue,
    sum(revenue) OVER (ORDER BY revenue, category) as cumsum
FROM
test_table
""")

Output:

[Row(category='Cell Phone', revenue=5500, cumsum=5500),
 Row(category='Tablet', revenue=5500, cumsum=11000),
 Row(category='Cell Phone', revenue=6000, cumsum=17000),
 Row(category='Cell Phone', revenue=6500, cumsum=23500),
 Row(category='Tablet', revenue=6500, cumsum=30000)]

Hope this helps. Using np.cumsum is not very efficient after collecting the data especially if the dataset is large. Another way you could explore is to use simple RDD transformations like groupByKey() and then use map to calculate the cumulative sum of each group by some key and then reduce it at the end.

like image 200
Dat Tran Avatar answered Dec 05 '22 05:12

Dat Tran