Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to conditionally replace value in a column based on evaluation of expression based on another column in Pyspark?

import numpy as np  df = spark.createDataFrame(     [(1, 1, None),      (1, 2, float(5)),      (1, 3, np.nan),      (1, 4, None),      (0, 5, float(10)),      (1, 6, float('nan')),      (0, 6, float('nan'))],     ('session', "timestamp1", "id2")) 
+-------+----------+----+ |session|timestamp1| id2| +-------+----------+----+ |      1|         1|null| |      1|         2| 5.0| |      1|         3| NaN| |      1|         4|null| |      0|         5|10.0| |      1|         6| NaN| |      0|         6| NaN| +-------+----------+----+ 

How to replace value of timestamp1 column with value 999 when session==0?

Expected output

+-------+----------+----+ |session|timestamp1| id2| +-------+----------+----+ |      1|         1|null| |      1|         2| 5.0| |      1|         3| NaN| |      1|         4|null| |      0|       999|10.0| |      1|         6| NaN| |      0|       999| NaN| +-------+----------+----+ 

Is it possible to do it using replace() in PySpark?

like image 692
GeorgeOfTheRF Avatar asked Jun 27 '17 06:06

GeorgeOfTheRF


People also ask

How do you replace values in a column based on condition in PySpark?

You can replace column values of PySpark DataFrame by using SQL string functions regexp_replace(), translate(), and overlay() with Python examples.

How do I change the value of an existing column in PySpark?

You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.

How do you use coalesce in PySpark?

The Data frame coalesce can be used in the same way by using the. RDD that converts it to RDD and getting the NUM Partitions. Let us check some more examples for Coalesce function. Let us try to increase the partition using the coalesce function; we will try to increase the partition from the default partition.

How do you rename column values in PySpark?

Method 1: Using withColumnRenamed() We will use of withColumnRenamed() method to change the column names of pyspark data frame. existingstr: Existing column name of data frame to rename. newstr: New column name. Returns type: Returns a data frame by renaming an existing column.


1 Answers

You should be using the when (with otherwise) function:

from pyspark.sql.functions import when  targetDf = df.withColumn("timestamp1", \               when(df["session"] == 0, 999).otherwise(df["timestamp1"])) 
like image 155
Assaf Mendelson Avatar answered Oct 13 '22 01:10

Assaf Mendelson