Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fill in null with previously known good value with pyspark

Is there a way to replace null values in pyspark dataframe with the last valid value? There is addtional timestamp and session columns if you think you need them for windows partitioning and ordering. More specifically, I'd like to achieve the following conversion:

+---------+-----------+-----------+      +---------+-----------+-----------+ | session | timestamp |         id|      | session | timestamp |         id| +---------+-----------+-----------+      +---------+-----------+-----------+ |        1|          1|       null|      |        1|          1|       null| |        1|          2|        109|      |        1|          2|        109| |        1|          3|       null|      |        1|          3|        109| |        1|          4|       null|      |        1|          4|        109| |        1|          5|        109| =>   |        1|          5|        109| |        1|          6|       null|      |        1|          6|        109| |        1|          7|        110|      |        1|          7|        110| |        1|          8|       null|      |        1|          8|        110| |        1|          9|       null|      |        1|          9|        110| |        1|         10|       null|      |        1|         10|        110| +---------+-----------+-----------+      +---------+-----------+-----------+ 
like image 689
Oleksiy Avatar asked Mar 31 '16 20:03

Oleksiy


People also ask

How do you populate NULL values in PySpark?

In PySpark, DataFrame. fillna() or DataFrameNaFunctions. fill() is used to replace NULL/None values on all or selected multiple DataFrame columns with either zero(0), empty string, space, or any constant literal values.

How do I change a value to NULL in Spark DataFrame?

The replacement of null values in PySpark DataFrames is one of the most common operations undertaken. This can be achieved by using either DataFrame. fillna() or DataFrameNaFunctions. fill() methods.

How do you assign a NULL to a column in PySpark?

In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .


2 Answers

I believe I have a much simpler solution than the accepted. It is using Functions too, but uses the function called 'LAST' and ignores nulls.

Let's re-create something similar to the original data:

import sys from pyspark.sql.window import Window import pyspark.sql.functions as func  d = [{'session': 1, 'ts': 1}, {'session': 1, 'ts': 2, 'id': 109}, {'session': 1, 'ts': 3}, {'session': 1, 'ts': 4, 'id': 110}, {'session': 1, 'ts': 5},  {'session': 1, 'ts': 6}] df = spark.createDataFrame(d) 

This prints:

+-------+---+----+ |session| ts|  id| +-------+---+----+ |      1|  1|null| |      1|  2| 109| |      1|  3|null| |      1|  4| 110| |      1|  5|null| |      1|  6|null| +-------+---+----+ 

Now, if we use the window function LAST:

df.withColumn("id", func.last('id', True).over(Window.partitionBy('session').orderBy('ts').rowsBetween(-sys.maxsize, 0))).show() 

We just get:

+-------+---+----+ |session| ts|  id| +-------+---+----+ |      1|  1|null| |      1|  2| 109| |      1|  3| 109| |      1|  4| 110| |      1|  5| 110| |      1|  6| 110| +-------+---+----+ 

Hope it helps!

like image 93
elmosca Avatar answered Oct 04 '22 07:10

elmosca


This seems to be doing the trick using Window functions:

import sys from pyspark.sql.window import Window import pyspark.sql.functions as func  def fill_nulls(df):     df_na = df.na.fill(-1)     lag = df_na.withColumn('id_lag', func.lag('id', default=-1)\                            .over(Window.partitionBy('session')\                                  .orderBy('timestamp')))      switch = lag.withColumn('id_change',                             ((lag['id'] != lag['id_lag']) &                              (lag['id'] != -1)).cast('integer'))       switch_sess = switch.withColumn(         'sub_session',         func.sum("id_change")         .over(             Window.partitionBy("session")             .orderBy("timestamp")             .rowsBetween(-sys.maxsize, 0))     )      fid = switch_sess.withColumn('nn_id',                            func.first('id')\                            .over(Window.partitionBy('session', 'sub_session')\                                  .orderBy('timestamp')))      fid_na = fid.replace(-1, 'null')      ff = fid_na.drop('id').drop('id_lag')\                           .drop('id_change')\                           .drop('sub_session').\                           withColumnRenamed('nn_id', 'id')      return ff 

Here is the full null_test.py.

like image 37
Oleksiy Avatar answered Oct 04 '22 07:10

Oleksiy