Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to join two DataFrame and replace one column conditionally in spark

There are two dataframes. For simplicity, I put them as follow:

DataFrame1

id | name
-----------
0  | Mike
1  | James

DataFrame2

id | name | salary
-------------------
0  | M    | 10
1  | J    | 20
2  | K    | 30

I want to join the two DataFrame on id and only keep the column name in DataFrame1 while keeping the original one if there is no corresponding id in DataFrame2.

It should be:

id | name  | salary
--------------------
0  | Mike  |  10
1  | James |  20
2  | K     |  30

Till now, I only know how to join two dataframes by:

df1.join(df2, df1("id")===df2("id"), "left").select(df2("id"), df1("name"), df2("salary"))

But it will use null to ignore the name value "K".

Thanks!

like image 202
Siyu Leng Avatar asked Dec 04 '16 15:12

Siyu Leng


People also ask

How do I change a column value in Spark DataFrame?

You can replace column values of PySpark DataFrame by using SQL string functions regexp_replace(), translate(), and overlay() with Python examples.

How do I change the value of a column in PySpark?

You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.

What is coalesce in Spark?

What is Coalesce? The coalesce method reduces the number of partitions in a DataFrame. Coalesce avoids full shuffle, instead of creating new partitions, it shuffles the data using Hash Partitioner (Default), and adjusts into existing partitions, this means it can only decrease the number of partitions.


2 Answers

You can use coalesce, which returns the first column that isn't null from the given columns. Plus - using left join you should join df1 to df2 and not the other way around:

import org.apache.spark.sql.functions._

df2.join(df1, df1("id")===df2("id"), "left")
  .select(df2("id"), coalesce(df1("name"), df2("name")), df2("salary"))
like image 133
Tzach Zohar Avatar answered Nov 14 '22 22:11

Tzach Zohar


to replace null values you can use DataFrameNaFunctions like below...

    df1.join(df2, df1("id")===df2("id"), "left_outer")
.select(df2("id"), df1("name"), df2("salary"))
.na.fill(ImmutableMap.of("name", "unknown")).show()

where 'unknown' is sample value. you can replace with value you wanted...

if you don't want rows with null value columns

val joined = df1.join(df2, df1("id")===df2("id"), "left_outer")
    .select(df2("id"), df1("name"), df2("salary"))

val final = joined.where(joined.col("name").isNotNull)
final.show()

Also Note that as mentioned in @Tzach Zohar answer def coalesce(e: Column*) function

Returns the first column that is not null, or null if all inputs are null.

If you are looking for that kind of ... then you can go ahead.

like image 20
Ram Ghadiyaram Avatar answered Nov 14 '22 21:11

Ram Ghadiyaram