Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

update a dataframe column with new values

df1 has fields id and json; df2 has fields idand json

df1.count() => 1200; df2.count() => 20

df1 has all the rows. df2 has an incremental update with just 20 rows.

My goal is to update df1 with the values from df2. All the ids of df2 are in df1. But df2 has updated values(in the json field) for those same ids.

Resulting df should have all the values from df1 and updated values from df2.

What is the best way to do this? - With the least number of joins and filters.

Thanks!

like image 991
suprita shankar Avatar asked Mar 23 '18 04:03

suprita shankar


People also ask

How do I overwrite a column in pandas?

In order to replace a value in Pandas DataFrame, use the replace() method with the column the from and to values.

How do you update column values in PySpark DataFrame?

You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.

How do you add new values to a DataFrame?

Dataframe append syntaxYou type the name of the first dataframe, and then . append() to call the method. Then inside the parenthesis, you type the name of the second dataframe, which you want to append to the end of the first.


1 Answers

You can achieve this using one left join.

Create Example DataFrames

Using the sample data provided by @Shankar Koirala in his answer.

data1 = [
  (1, "a"),
  (2, "b"),
  (3, "c")
]
df1 = sqlCtx.createDataFrame(data1, ["id", "value"])

data2 = [
  (1, "x"), 
  (2, "y")
]

df2 = sqlCtx.createDataFrame(data2, ["id", "value"])

Do a left join

Join the two DataFrames using a left join on the id column. This will keep all of the rows in the left DataFrame. For the rows in the right DataFrame that don't have a matching id, the value will be null.

import pyspark.sql.functions as f
df1.alias('l').join(df2.alias('r'), on='id', how='left')\
    .select(
        'id',
         f.col('l.value').alias('left_value'),
         f.col('r.value').alias('right_value')
    )\
    .show()
#+---+----------+-----------+
#| id|left_value|right_value|
#+---+----------+-----------+
#|  1|         a|          x|
#|  3|         c|       null|
#|  2|         b|          y|
#+---+----------+-----------+

Select the desired data

We will use the fact that the unmatched ids have a null to select the final columns. Use pyspark.sql.functions.when() to use the right value if it is not null, otherwise keep the left value.

df1.alias('l').join(df2.alias('r'), on='id', how='left')\
    .select(
        'id',
        f.when(
            ~f.isnull(f.col('r.value')),
            f.col('r.value')
        ).otherwise(f.col('l.value')).alias('value')
    )\
    .show()
#+---+-----+
#| id|value|
#+---+-----+
#|  1|    x|
#|  3|    c|
#|  2|    y|
#+---+-----+

You can sort this output if you want the ids in order.


Using pyspark-sql

You can do the same thing using a pyspark-sql query:

df1.registerTempTable('df1')
df2.registerTempTable('df2')

query = """SELECT l.id, 
CASE WHEN r.value IS NOT NULL THEN r.value ELSE l.value END AS value 
FROM df1 l LEFT JOIN df2 r ON l.id = r.id"""
sqlCtx.sql(query.replace("\n", "")).show()
#+---+-----+
#| id|value|
#+---+-----+
#|  1|    x|
#|  3|    c|
#|  2|    y|
#+---+-----+
like image 52
pault Avatar answered Oct 01 '22 21:10

pault