Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

complex logic on pyspark dataframe including previous row existing value as well as previous row value generated on the fly

Tags:

pyspark

I have to apply a logic on spark dataframe or rdd(preferably dataframe) which requires to generate two extra column. First generated column is dependent on other columns of same row and second generated column is dependent on first generated column of previous row.

Below is representation of problem statement in tabular format. A and B columns are available in dataframe. C and D columns are to be generated.

A |  B   | C            |     D
------------------------------------
1 | 100  |  default val |    C1-B1
2 | 200  |  D1-C1       |    C2-B2
3 | 300  |  D2-C2       |    C3-B3
4 | 400  |  D3-C3       |    C4-B4
5 | 500  |  D4-C4       |    C5-B5

Here is the sample data

A |  B   |    C   |   D
------------------------
1 | 100  |   1000 |  900
2 | 200  |  -100  | -300
3 | 300  |  -200  | -500
4 | 400  |  -300  | -700
5 | 500  |  -400  | -900

Only solution I can think of is to coalesce the input dataframe to 1, convert it to rdd and then apply python function (having all the calcuation logic) to mapPartitions API . However this approach may create load on one executor.

like image 924
Prateek Pathak Avatar asked Sep 18 '25 05:09

Prateek Pathak


2 Answers

The lag() function may help you with that:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

w =  Window.orderBy("A")

df1 = df1.withColumn("C", F.lit(1000))

df2 = (
      df1
     .withColumn("D", F.col("C") - F.col("B"))
     .withColumn("C", 
                  F.when(F.lag("C").over(w).isNotNull(), 
                         F.lag("D").over(w) - F.lag("C").over(w))
                   .otherwise(F.col("C")))
     .withColumn("D", F.col("C") - F.col("B"))
)

like image 135
Thales Souto Avatar answered Sep 19 '25 23:09

Thales Souto


Mathematically seeing, D1-C1 where D1= C1-B1; so D1-C1 will become C1-B1-C1 => -B1. In pyspark, window function has a parameter called default. this should simplify your problem. try this:

import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame([(1,100),(2,200),(3,300),(4,400),(5,500)],['a','b'])
w=Window.orderBy('a')
df_lag =df.withColumn('c',F.lag((F.col('b')*-1),default=1000).over(w))
df_final = df_lag.withColumn('d',F.col('c')-F.col('b'))

Results:

df_final.show()
+---+---+----+----+
|  a|  b|   c|   d|
+---+---+----+----+
|  1|100|1000| 900|
|  2|200|-100|-300|
|  3|300|-200|-500|
|  4|400|-300|-700|
|  5|500|-400|-900|
+---+---+----+----+

If the operation is something complex other than subtraction, then the same logic applies - fill the column C with your default value- calculate D , then use lag to calculate C and recalculate D.

like image 38
Raghu Avatar answered Sep 19 '25 21:09

Raghu