For every row in a PySpark DataFrame I am trying to get a value from the first preceding row that satisfied a certain condition:
That is if my dataframe looks like this:
X | Flag
1 | 1
2 | 0
3 | 0
4 | 0
5 | 1
6 | 0
7 | 0
8 | 0
9 | 1
10 | 0
I want output that looks like this:
X | Lag_X | Flag
1 | NULL | 1
2 | 1 | 0
3 | 1 | 0
4 | 1 | 0
5 | 1 | 1
6 | 5 | 0
7 | 5 | 0
8 | 5 | 0
9 | 5 | 1
10 | 9 | 0
I thought I could do this with lag function and a WindowSpec, unfortunately WindowSpec doesnt support .filter
or .when
, so this does not work:
conditional_window = Window().orderBy(X).filter(df[Flag] == 1)
df = df.withColumn('lag_x', f.lag(df[x],1).over(conditional_window)
It seems like this should be simple, but I have been racking my brain trying to find a solution so any help with this would be greatly appreciated
Show Last N Rows in Spark/PySparkUse tail() action to get the Last N rows from a DataFrame, this returns a list of class Row for PySpark and Array[Row] for Spark with Scala. Remember tail() also moves the selected number of rows to Spark Driver hence limit your data that could fit in Spark Driver's memory.
Method 6: Using select() with collect() method This method is used to select a particular row from the dataframe, It can be used with collect() function. where, dataframe is the pyspark dataframe. Columns is the list of columns to be displayed in each row.
Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.
PYSPARK toDF is a method in PySpark that is used to create a Data frame in PySpark. The model provides a way . toDF that can be used to create a data frame from an RDD. Post conversion of RDD in a data frame, the data then becomes more organized and easy for analysis purposes.
Question is old, but I thought the answer might help others
Here is a working solution using window and lag functions
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import when
from pyspark.context import SparkContext
# Call SparkContext
sc = SparkContext.getOrCreate()
sc = sparkContext
# Create DataFrame
a = sc.createDataFrame([(1, 1),
(2, 0),
(3, 0),
(4, 0),
(5, 1),
(6, 0),
(7, 0),
(8, 0),
(9, 1),
(10, 0)]
, ['X', 'Flag'])
# Use a window function
win = Window.orderBy("X")
# Condition : if preceeding row in column "Flag" is not 0
condition = F.lag(F.col("Flag"), 1).over(win) != 0
# Add a new column : if condition is true, value is value of column "X" at the previous row
a = a.withColumn("Flag_X", F.when(condition, F.col("X") - 1))
Now, we obtain a DataFrame as shown below
+---+----+------+
| X|Flag|Flag_X|
+---+----+------+
| 1| 1| null|
| 2| 0| 1|
| 3| 0| null|
| 4| 0| null|
| 5| 1| null|
| 6| 0| 5|
| 7| 0| null|
| 8| 0| null|
| 9| 1| null|
| 10| 0| 9|
+---+----+------+
To fill null values :
a = a.withColumn("Flag_X",
F.last(F.col("Flag_X"), ignorenulls=True)\
.over(win))
So the final DataFrame is as required :
+---+----+------+
| X|Flag|Flag_X|
+---+----+------+
| 1| 1| null|
| 2| 0| 1|
| 3| 0| 1|
| 4| 0| 1|
| 5| 1| 1|
| 6| 0| 5|
| 7| 0| 5|
| 8| 0| 5|
| 9| 1| 5|
| 10| 0| 9|
+---+----+------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With