I have a spark dataframe with 5 columns group, date, a, b, and c and I want to do the following:
given df
group date a b c
a 2018-01 2 3 10
a 2018-02 4 5 null
a 2018-03 2 1 null
expected output
group date a b c
a 2018-01 2 3 10
a 2018-02 4 5 10*3+2=32
a 2018-03 2 1 32*5+4=164
for each group, calculate c by b * c + a and use the output as the c of the next row.
I tried using Lag and window function but couldn't find the right way for this.
Within a window you cannot access results of a column that you are currently about to calculate. This would force Spark to do the calculations sequentially and should be avoided. Another approach is to transform the recursive calculation c_n = func(c_(n-1))
into a formula that only uses the (constant) values of a
, b
and the first value of c
:
All input values for this formula can be collected with a window and the formula itself is implemented as udf:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window
df = ...
w=Window.partitionBy('group').orderBy('date')
df1 = df.withColumn("la", F.collect_list("a").over(w)) \
.withColumn("lb", F.collect_list("b").over(w)) \
.withColumn("c0", F.first("c").over(w))
import numpy as np
def calc_c(c0, a, b):
if c0 is None:
return 0.0
if len(a) == 1:
return float(c0)
e1 = c0 * np.prod(b[:-1])
e2 = 0.0
for i,an in enumerate(a[:-1]):
e2 = e2 + an * np.prod(b[i+1:-1])
return float(e1 + e2)
calc_c_udf= F.udf(calc_c, T.DoubleType())
df1.withColumn("result", calc_c_udf("c0", "la", "lb")) \
.show()
Output:
+-----+-------+---+---+----+---------+---------+---+------+
|group| date| a| b| c| la| lb| c0|result|
+-----+-------+---+---+----+---------+---------+---+------+
| a|2018-01| 2| 3| 10| [2]| [3]| 10| 10.0|
| a|2018-02| 4| 5|null| [2, 4]| [3, 5]| 10| 32.0|
| a|2018-03| 2| 1|null|[2, 4, 2]|[3, 5, 1]| 10| 164.0|
+-----+-------+---+---+----+---------+---------+---+------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With