I am using pySpark
, and have set up my dataframe with two columns representing a daily asset price as follows:
ind = sc.parallelize(range(1,5)) prices = sc.parallelize([33.3,31.1,51.2,21.3]) data = ind.zip(prices) df = sqlCtx.createDataFrame(data,["day","price"])
I get upon applying df.show()
:
+---+-----+ |day|price| +---+-----+ | 1| 33.3| | 2| 31.1| | 3| 51.2| | 4| 21.3| +---+-----+
Which is fine and all. I would like to have another column that contains the day-to-day returns of the price column, i.e., something like
(price(day2)-price(day1))/(price(day1))
After much research, I am told that this is most efficiently accomplished through applying the pyspark.sql.window
functions, but I am unable to see how.
PySpark Window function performs statistical operations such as rank, row number, etc. on a group, frame, or collection of rows and returns results for each row individually. It is also popularly growing to perform data transformations.
Timestamp difference in PySpark can be calculated by using 1) unix_timestamp() to get the Time in seconds and subtract with other time to get the seconds 2) Cast TimestampType column to LongType and subtract two long values to get the difference in seconds, divide it by 60 to get the minute difference and finally ...
Window function: returns the value that is offset rows after the current row, and default if there is less than offset rows after the current row. For example, an offset of one will return the next row at any given point in the window partition. This is equivalent to the LEAD function in SQL.
You can bring the previous day column by using lag function, and add additional column that does actual day-to-day return from the two columns, but you may have to tell spark how to partition your data and/or order it to do lag, something like this:
from pyspark.sql.window import Window import pyspark.sql.functions as func from pyspark.sql.functions import lit dfu = df.withColumn('user', lit('tmoore')) df_lag = dfu.withColumn('prev_day_price', func.lag(dfu['price']) .over(Window.partitionBy("user"))) result = df_lag.withColumn('daily_return', (df_lag['price'] - df_lag['prev_day_price']) / df_lag['price'] ) >>> result.show() +---+-----+-------+--------------+--------------------+ |day|price| user|prev_day_price| daily_return| +---+-----+-------+--------------+--------------------+ | 1| 33.3| tmoore| null| null| | 2| 31.1| tmoore| 33.3|-0.07073954983922816| | 3| 51.2| tmoore| 31.1| 0.392578125| | 4| 21.3| tmoore| 51.2| -1.403755868544601| +---+-----+-------+--------------+--------------------+
Here is longer introduction into Window functions in Spark.
Lag function can help you resolve your use case.
from pyspark.sql.window import Window import pyspark.sql.functions as func ### Defining the window Windowspec=Window.orderBy("day") ### Calculating lag of price at each day level prev_day_price= df.withColumn('prev_day_price', func.lag(dfu['price']) .over(Windowspec)) ### Calculating the average result = prev_day_price.withColumn('daily_return', (prev_day_price['price'] - prev_day_price['prev_day_price']) / prev_day_price['price'] )
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With