df1 has fields id
and json
; df2 has fields id
and json
df1.count()
=> 1200; df2.count()
=> 20
df1 has all the rows. df2 has an incremental update with just 20 rows.
My goal is to update df1 with the values from df2
. All the ids of df2
are in df1. But df2 has updated values(in the json
field) for those same ids.
Resulting df should have all the values from df1
and updated values from df2
.
What is the best way to do this? - With the least number of joins and filters.
Thanks!
In order to replace a value in Pandas DataFrame, use the replace() method with the column the from and to values.
You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.
Dataframe append syntaxYou type the name of the first dataframe, and then . append() to call the method. Then inside the parenthesis, you type the name of the second dataframe, which you want to append to the end of the first.
You can achieve this using one left join.
Create Example DataFrames
Using the sample data provided by @Shankar Koirala in his answer.
data1 = [
(1, "a"),
(2, "b"),
(3, "c")
]
df1 = sqlCtx.createDataFrame(data1, ["id", "value"])
data2 = [
(1, "x"),
(2, "y")
]
df2 = sqlCtx.createDataFrame(data2, ["id", "value"])
Do a left join
Join the two DataFrames using a left join on the id
column. This will keep all of the rows in the left DataFrame. For the rows in the right DataFrame that don't have a matching id
, the value will be null
.
import pyspark.sql.functions as f
df1.alias('l').join(df2.alias('r'), on='id', how='left')\
.select(
'id',
f.col('l.value').alias('left_value'),
f.col('r.value').alias('right_value')
)\
.show()
#+---+----------+-----------+
#| id|left_value|right_value|
#+---+----------+-----------+
#| 1| a| x|
#| 3| c| null|
#| 2| b| y|
#+---+----------+-----------+
Select the desired data
We will use the fact that the unmatched id
s have a null
to select the final columns. Use pyspark.sql.functions.when()
to use the right value if it is not null, otherwise keep the left value.
df1.alias('l').join(df2.alias('r'), on='id', how='left')\
.select(
'id',
f.when(
~f.isnull(f.col('r.value')),
f.col('r.value')
).otherwise(f.col('l.value')).alias('value')
)\
.show()
#+---+-----+
#| id|value|
#+---+-----+
#| 1| x|
#| 3| c|
#| 2| y|
#+---+-----+
You can sort this output if you want the id
s in order.
Using pyspark-sql
You can do the same thing using a pyspark-sql query:
df1.registerTempTable('df1')
df2.registerTempTable('df2')
query = """SELECT l.id,
CASE WHEN r.value IS NOT NULL THEN r.value ELSE l.value END AS value
FROM df1 l LEFT JOIN df2 r ON l.id = r.id"""
sqlCtx.sql(query.replace("\n", "")).show()
#+---+-----+
#| id|value|
#+---+-----+
#| 1| x|
#| 3| c|
#| 2| y|
#+---+-----+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With