I have a data frame
in pyspark
like below.
df.show()
+-----------+------------+-------------+
|customer_id|product_name| country|
+-----------+------------+-------------+
| 12870946| null| Poland|
| 815518| MA401|United States|
| 3138420| WG111v2| UK|
| 3178864| WGR614v6|United States|
| 7456796| XE102|United States|
| 21893468| AGM731F|United States|
+-----------+------------+-------------+
I have another data frame like below
df1.show()
+-----------+------------+
|customer_id|product_name|
+-----------+------------+
| 12870946| GS748TS|
| 815518| MA402|
| 3138420| null|
| 3178864| WGR614v6|
| 7456796| XE102|
| 21893468| AGM731F|
| null| AE171|
+-----------+------------+
Now I want to do a fuller outer join
on these tables and update product_name
column values like below.
1) Overwrite the values in `df` using values in `df1` if there are values in `df1`.
2) if there are `null` values or `no` values in `df1` then leave the values in `df` as they are
expected result
+-----------+------------+-------------+
|customer_id|product_name| country|
+-----------+------------+-------------+
| 12870946| GS748TS| Poland|
| 815518| MA402|United States|
| 3138420| WG111v2| UK|
| 3178864| WGR614v6|United States|
| 7456796| XE102|United States|
| 21893468| AGM731F|United States|
| null| AE171| null|
+-----------+------------+-------------+
I have done like below
import pyspark.sql.functions as f
df2 = df.join(df1, df.customer_id == df1.customer_id, 'full_outer').select(df.customer_id, f.coalesce(df.product_name, df1.product_name).alias('product_name'), df.country)
But the result I am getting is different
df2.show()
+-----------+------------+-------------+
|customer_id|product_name| country|
+-----------+------------+-------------+
| 12870946| null| Poland|
| 815518| MA401|United States|
| 3138420| WG111v2| UK|
| 3178864| WGR614v6|United States|
| 7456796| XE102|United States|
| 21893468| AGM731F|United States|
| null| AE171| null|
+-----------+------------+-------------+
How can I get the expected result
PySpark Update a Column with Value. You can do update a PySpark DataFrame Column using withColum (), select () and sql (), since DataFrame’s are distributed immutable collection you can’t really change the column values however when you change the value using withColumn () or any approach, PySpark returns a new Dataframe with updated values.
I prefer to overwrite the value already in Column D, rather than assign two different values, because I'd like to selectively overwrite some of these values again later, under different conditions.
PySpark Replace String Column Values By using PySpark SQL function regexp_replace () you can replace a column value with a string for another string/substring. regexp_replace () uses Java regex for matching, if the regex does not match it returns an empty string, the below example replace the street name Rd value with Road string on address column.
By using PySpark SQL function regexp_replace() you can replace a column value with a string for another string/substring. regexp_replace() uses Java regex for matching, if the regex does not match it returns an empty string, the below example replace the street name Rd value with Road string on address column.
The code you wrote produces the correct output for me, so I can not reproduce your problem. I have seen other posts where using an alias when doing joins has resolved issues so here is a slightly modified version of your code which will do the same thing:
import pyspark.sql.functions as f
df.alias("r").join(df1.alias("l"), on="customer_id", how='full_outer')\
.select(
"customer_id",
f.coalesce("r.product_name", "l.product_name").alias('product_name'),
"country"
)\
.show()
#+-----------+------------+-------------+
#|customer_id|product_name| country|
#+-----------+------------+-------------+
#| 7456796| XE102|United States|
#| 3178864| WGR614v6|United States|
#| null| AE171| null|
#| 815518| MA401|United States|
#| 3138420| WG111v2| UK|
#| 12870946| GS748TS| Poland|
#| 21893468| AGM731F|United States|
#+-----------+------------+-------------+
I get the same results when I run your code as well (reproduced below):
df.join(df1, df.customer_id == df1.customer_id, 'full_outer')\
.select(
df.customer_id,
f.coalesce(df.product_name, df1.product_name).alias('product_name'),
df.country
)\
.show()
I am using spark 2.1 and python 2.7.13.
Your code is perfect if the values are not string null. But looking at the df2 dataframe you are getting the values in product_name
seem to be string null. You will have to check for string null using when
inbuilt function and isnull
inbuilt funtion as
import pyspark.sql.functions as f
df2 = df.join(df1, df.customer_id == df1.customer_id, 'full_outer')\
.select(df.customer_id, f.when(f.isnull(df.product_name) | (df.product_name == "null"), df1.product_name).otherwise(df.product_name).alias('product_name'), df.country)
df2.show(truncate=False)
which should give you
+-----------+------------+------------+
|customer_id|product_name|country |
+-----------+------------+------------+
|7456796 |XE102 |UnitedStates|
|3178864 |WGR614v6 |UnitedStates|
|815518 |MA401 |UnitedStates|
|3138420 |WG111v2 |UK |
|12870946 |GS748TS |Poland |
|21893468 |AGM731F |UnitedStates|
|null |AE171 |null |
+-----------+------------+------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With