Using Spark 1.5.1,
I've been trying to forward fill null values with the last known observation for one column of my DataFrame.
It is possible to start with a null value and for this case I would to backward fill this null value with the first knwn observation. However, If that too complicates the code, this point can be skipped.
In this post, a solution in Scala was provided for a very similar problem by zero323.
But, I don't know Scala and I don't succeed to ''translate'' it in Pyspark API code. It's possible to do it with Pyspark ?
Thanks for your help.
Below, a simple example sample input:
| cookie_ID | Time | User_ID
| ------------- | -------- |-------------
| 1 | 2015-12-01 | null
| 1 | 2015-12-02 | U1
| 1 | 2015-12-03 | U1
| 1 | 2015-12-04 | null
| 1 | 2015-12-05 | null
| 1 | 2015-12-06 | U2
| 1 | 2015-12-07 | null
| 1 | 2015-12-08 | U1
| 1 | 2015-12-09 | null
| 2 | 2015-12-03 | null
| 2 | 2015-12-04 | U3
| 2 | 2015-12-05 | null
| 2 | 2015-12-06 | U4
And the expected output:
| cookie_ID | Time | User_ID
| ------------- | -------- |-------------
| 1 | 2015-12-01 | U1
| 1 | 2015-12-02 | U1
| 1 | 2015-12-03 | U1
| 1 | 2015-12-04 | U1
| 1 | 2015-12-05 | U1
| 1 | 2015-12-06 | U2
| 1 | 2015-12-07 | U2
| 1 | 2015-12-08 | U1
| 1 | 2015-12-09 | U1
| 2 | 2015-12-03 | U3
| 2 | 2015-12-04 | U3
| 2 | 2015-12-05 | U3
| 2 | 2015-12-06 | U4
Use tail() action to get the Last N rows from a DataFrame, this returns a list of class Row for PySpark and Array[Row] for Spark with Scala.
Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.
PySpark map() Transformation is used to loop/iterate through the PySpark DataFrame/RDD by applying the transformation function (lambda) on every element (Rows and Columns) of RDD/DataFrame.
Another workaround to get this working, is to try something like this:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
window = (
Window
.partitionBy('cookie_id')
.orderBy('Time')
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
final = (
joined
.withColumn('UserIDFilled', F.last('User_ID', ignorenulls=True).over(window))
)
So what this is doing is that it constructs your window based on the partition key and the order column. It also tells the window to look back all rows within the window up to the current row. Finally, at each row, you return the last value that is not null (which remember, according to your window, it includes your current row)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With