Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Conditional Join in Spark DataFrame

I am trying to join two DataFrame with condition.

I have two dataframe A and B.

A contains id,m_cd and c_cd columns B contains m_cd,c_cd and record columns

Conditions are -

  • If m_cd is null then join c_cd of A with B
  • If m_cd is not null then join m_cd of A with B

we can use "when" and "otherwise()" in withcolumn() method of dataframe, so is there any way to do this for the case of join in dataframe.

I have already done this using Union.But wanted to know if there any other option available.

like image 893
Avijit Avatar asked Sep 09 '16 17:09

Avijit


People also ask

How do you join with conditions in PySpark?

PySpark Join Two DataFrames Following is the syntax of join. The first join syntax takes, right dataset, joinExprs and joinType as arguments and we use joinExprs to provide a join condition. The second join syntax takes just the right dataset and joinExprs and it considers default join as inner join .

How do I merge two data frames in Spark?

PySpark Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames; it supports all basic join type operations available in traditional SQL like INNER , LEFT OUTER , RIGHT OUTER , LEFT ANTI , LEFT SEMI , CROSS , SELF JOIN.


1 Answers

You can use the "when" / "otherwise" in the join condition:

case class Foo(m_cd: Option[Int], c_cd: Option[Int])
val dfA = spark.createDataset(Array(
    Foo(Some(1), Some(2)),
    Foo(Some(2), Some(3)),
    Foo(None: Option[Int], Some(4))
))


val dfB = spark.createDataset(Array(
    Foo(Some(1), Some(5)),
    Foo(Some(2), Some(6)),
    Foo(Some(10), Some(4))
))

val joinCondition = when($"a.m_cd".isNull, $"a.c_cd"===$"b.c_cd")
    .otherwise($"a.m_cd"===$"b.m_cd")

dfA.as('a).join(dfB.as('b), joinCondition).show

It might still be more readable to use the union, though.

like image 189
alghimo Avatar answered Nov 04 '22 18:11

alghimo