Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Applying a Window function to calculate differences in pySpark

I am using pySpark, and have set up my dataframe with two columns representing a daily asset price as follows:

ind = sc.parallelize(range(1,5)) prices = sc.parallelize([33.3,31.1,51.2,21.3]) data = ind.zip(prices) df = sqlCtx.createDataFrame(data,["day","price"]) 

I get upon applying df.show():

+---+-----+ |day|price| +---+-----+ |  1| 33.3| |  2| 31.1| |  3| 51.2| |  4| 21.3| +---+-----+ 

Which is fine and all. I would like to have another column that contains the day-to-day returns of the price column, i.e., something like


After much research, I am told that this is most efficiently accomplished through applying the pyspark.sql.window functions, but I am unable to see how.

like image 937
Thomas Moore Avatar asked Apr 19 '16 17:04

Thomas Moore

People also ask

What does window function do in Pyspark?

PySpark Window function performs statistical operations such as rank, row number, etc. on a group, frame, or collection of rows and returns results for each row individually. It is also popularly growing to perform data transformations.

How is Pyspark time difference calculated?

Timestamp difference in PySpark can be calculated by using 1) unix_timestamp() to get the Time in seconds and subtract with other time to get the seconds 2) Cast TimestampType column to LongType and subtract two long values to get the difference in seconds, divide it by 60 to get the minute difference and finally ...

How do you use lead function in Pyspark?

Window function: returns the value that is offset rows after the current row, and default if there is less than offset rows after the current row. For example, an offset of one will return the next row at any given point in the window partition. This is equivalent to the LEAD function in SQL.

2 Answers

You can bring the previous day column by using lag function, and add additional column that does actual day-to-day return from the two columns, but you may have to tell spark how to partition your data and/or order it to do lag, something like this:

from pyspark.sql.window import Window import pyspark.sql.functions as func from pyspark.sql.functions import lit  dfu = df.withColumn('user', lit('tmoore'))  df_lag = dfu.withColumn('prev_day_price',                         func.lag(dfu['price'])                                  .over(Window.partitionBy("user")))  result = df_lag.withColumn('daily_return',            (df_lag['price'] - df_lag['prev_day_price']) / df_lag['price'] )  >>> result.show() +---+-----+-------+--------------+--------------------+ |day|price|   user|prev_day_price|        daily_return| +---+-----+-------+--------------+--------------------+ |  1| 33.3| tmoore|          null|                null| |  2| 31.1| tmoore|          33.3|-0.07073954983922816| |  3| 51.2| tmoore|          31.1|         0.392578125| |  4| 21.3| tmoore|          51.2|  -1.403755868544601| +---+-----+-------+--------------+--------------------+ 

Here is longer introduction into Window functions in Spark.

like image 169
Oleksiy Avatar answered Oct 23 '22 21:10


Lag function can help you resolve your use case.

from pyspark.sql.window import Window import pyspark.sql.functions as func  ### Defining the window  Windowspec=Window.orderBy("day")  ### Calculating lag of price at each day level prev_day_price= df.withColumn('prev_day_price',                         func.lag(dfu['price'])                                 .over(Windowspec))  ### Calculating the average                                   result = prev_day_price.withColumn('daily_return',            (prev_day_price['price'] - prev_day_price['prev_day_price']) /  prev_day_price['price'] ) 
like image 21
Sushmita Konar Avatar answered Oct 23 '22 21:10

Sushmita Konar