Is there a way to replace null
values in pyspark dataframe with the last valid value? There is addtional timestamp
and session
columns if you think you need them for windows partitioning and ordering. More specifically, I'd like to achieve the following conversion:
+---------+-----------+-----------+ +---------+-----------+-----------+ | session | timestamp | id| | session | timestamp | id| +---------+-----------+-----------+ +---------+-----------+-----------+ | 1| 1| null| | 1| 1| null| | 1| 2| 109| | 1| 2| 109| | 1| 3| null| | 1| 3| 109| | 1| 4| null| | 1| 4| 109| | 1| 5| 109| => | 1| 5| 109| | 1| 6| null| | 1| 6| 109| | 1| 7| 110| | 1| 7| 110| | 1| 8| null| | 1| 8| 110| | 1| 9| null| | 1| 9| 110| | 1| 10| null| | 1| 10| 110| +---------+-----------+-----------+ +---------+-----------+-----------+
In PySpark, DataFrame. fillna() or DataFrameNaFunctions. fill() is used to replace NULL/None values on all or selected multiple DataFrame columns with either zero(0), empty string, space, or any constant literal values.
The replacement of null values in PySpark DataFrames is one of the most common operations undertaken. This can be achieved by using either DataFrame. fillna() or DataFrameNaFunctions. fill() methods.
In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .
I believe I have a much simpler solution than the accepted. It is using Functions too, but uses the function called 'LAST' and ignores nulls.
Let's re-create something similar to the original data:
import sys from pyspark.sql.window import Window import pyspark.sql.functions as func d = [{'session': 1, 'ts': 1}, {'session': 1, 'ts': 2, 'id': 109}, {'session': 1, 'ts': 3}, {'session': 1, 'ts': 4, 'id': 110}, {'session': 1, 'ts': 5}, {'session': 1, 'ts': 6}] df = spark.createDataFrame(d)
This prints:
+-------+---+----+ |session| ts| id| +-------+---+----+ | 1| 1|null| | 1| 2| 109| | 1| 3|null| | 1| 4| 110| | 1| 5|null| | 1| 6|null| +-------+---+----+
Now, if we use the window function LAST:
df.withColumn("id", func.last('id', True).over(Window.partitionBy('session').orderBy('ts').rowsBetween(-sys.maxsize, 0))).show()
We just get:
+-------+---+----+ |session| ts| id| +-------+---+----+ | 1| 1|null| | 1| 2| 109| | 1| 3| 109| | 1| 4| 110| | 1| 5| 110| | 1| 6| 110| +-------+---+----+
Hope it helps!
This seems to be doing the trick using Window functions:
import sys from pyspark.sql.window import Window import pyspark.sql.functions as func def fill_nulls(df): df_na = df.na.fill(-1) lag = df_na.withColumn('id_lag', func.lag('id', default=-1)\ .over(Window.partitionBy('session')\ .orderBy('timestamp'))) switch = lag.withColumn('id_change', ((lag['id'] != lag['id_lag']) & (lag['id'] != -1)).cast('integer')) switch_sess = switch.withColumn( 'sub_session', func.sum("id_change") .over( Window.partitionBy("session") .orderBy("timestamp") .rowsBetween(-sys.maxsize, 0)) ) fid = switch_sess.withColumn('nn_id', func.first('id')\ .over(Window.partitionBy('session', 'sub_session')\ .orderBy('timestamp'))) fid_na = fid.replace(-1, 'null') ff = fid_na.drop('id').drop('id_lag')\ .drop('id_change')\ .drop('sub_session').\ withColumnRenamed('nn_id', 'id') return ff
Here is the full null_test.py.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With