Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark Dataframe: Get previous row that meets a condition

For every row in a PySpark DataFrame I am trying to get a value from the first preceding row that satisfied a certain condition:

That is if my dataframe looks like this:

X  | Flag
1  | 1
2  | 0
3  | 0
4  | 0
5  | 1
6  | 0
7  | 0
8  | 0
9  | 1
10 | 0

I want output that looks like this:

X  | Lag_X | Flag
1  | NULL  | 1
2  | 1     | 0
3  | 1     | 0
4  | 1     | 0
5  | 1     | 1
6  | 5     | 0
7  | 5     | 0
8  | 5     | 0
9  | 5     | 1
10 | 9     | 0

I thought I could do this with lag function and a WindowSpec, unfortunately WindowSpec doesnt support .filter or .when, so this does not work:

conditional_window = Window().orderBy(X).filter(df[Flag] == 1)
df = df.withColumn('lag_x', f.lag(df[x],1).over(conditional_window)

It seems like this should be simple, but I have been racking my brain trying to find a solution so any help with this would be greatly appreciated

like image 449
NME IX Avatar asked Mar 27 '18 19:03

NME IX


People also ask

How do you get the last row in PySpark?

Show Last N Rows in Spark/PySparkUse tail() action to get the Last N rows from a DataFrame, this returns a list of class Row for PySpark and Array[Row] for Spark with Scala. Remember tail() also moves the selected number of rows to Spark Driver hence limit your data that could fit in Spark Driver's memory.

How do I select specific rows in PySpark?

Method 6: Using select() with collect() method This method is used to select a particular row from the dataframe, It can be used with collect() function. where, dataframe is the pyspark dataframe. Columns is the list of columns to be displayed in each row.

What does .collect do in PySpark?

Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.

What is toDF PySpark?

PYSPARK toDF is a method in PySpark that is used to create a Data frame in PySpark. The model provides a way . toDF that can be used to create a data frame from an RDD. Post conversion of RDD in a data frame, the data then becomes more organized and easy for analysis purposes.


1 Answers

Question is old, but I thought the answer might help others

Here is a working solution using window and lag functions

from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import when
from pyspark.context import SparkContext

# Call SparkContext
sc = SparkContext.getOrCreate()
sc = sparkContext

# Create DataFrame
a = sc.createDataFrame([(1, 1), 
                        (2, 0),
                        (3, 0),
                        (4, 0),
                        (5, 1),
                        (6, 0),
                        (7, 0),
                        (8, 0),
                        (9, 1),
                       (10, 0)]
                     , ['X', 'Flag'])

# Use a window function
win = Window.orderBy("X")
# Condition : if preceeding row in column "Flag" is not 0
condition = F.lag(F.col("Flag"), 1).over(win) != 0
# Add a new column : if condition is true, value is value of column "X" at the previous row
a = a.withColumn("Flag_X", F.when(condition, F.col("X") - 1))

Now, we obtain a DataFrame as shown below

+---+----+------+
|  X|Flag|Flag_X|
+---+----+------+
|  1|   1|  null|
|  2|   0|     1|
|  3|   0|  null|
|  4|   0|  null|
|  5|   1|  null|
|  6|   0|     5|
|  7|   0|  null|
|  8|   0|  null|
|  9|   1|  null|
| 10|   0|     9|
+---+----+------+

To fill null values :

a = a.withColumn("Flag_X", 
                 F.last(F.col("Flag_X"), ignorenulls=True)\
     .over(win))

So the final DataFrame is as required :

+---+----+------+
|  X|Flag|Flag_X|
+---+----+------+
|  1|   1|  null|
|  2|   0|     1|
|  3|   0|     1|
|  4|   0|     1|
|  5|   1|     1|
|  6|   0|     5|
|  7|   0|     5|
|  8|   0|     5|
|  9|   1|     5|
| 10|   0|     9|
+---+----+------+
like image 173
Mike Avatar answered Nov 14 '22 22:11

Mike