Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

perform join on multiple DataFrame in spark

I have 3dataframes generated from 3 different processes. Every dataframe is having columns of same name. My dataframe looks like this

id   val1    val2       val3    val4
 1    null   null       null    null
 2    A2      A21       A31      A41

id   val1      val2       val3      val4
 1    B1        B21        B31       B41
 2    null      null       null      null

id   val1     val2       val3    val4
 1    C1       C2        C3       C4
 2    C11      C12       C13      C14

Out of these 3 dataframes, i want to create two dataframes, (final and consolidated). For final, order of preferences - dataFrame 1 > Dataframe 2 > Dataframe 3

If a result is there in dataframe 1(val1 != null), i will store that row in final dataframe.

My final result should be :

id  finalVal1    finalVal2   finalVal3   finalVal4 
1     B1           B21         B31         B41
2     A2           A21         A31         A41

Consolidated Dataframe will store results from all 3.

How can i do that efficiently?

like image 916
Neha Avatar asked Oct 11 '16 11:10

Neha


1 Answers

If I understood you correctly, for each row you want to find out the first non-null values, first by looking into the first table, then the second table, then the third table.

You simply need to join these three tables based on the id and then use the coalesce function to get the first non-null element

import org.apache.spark.sql.functions._

val df1 = sc.parallelize(Seq(
    (1,null,null,null,null),
    (2,"A2","A21","A31", "A41"))
  ).toDF("id", "val1", "val2", "val3", "val4")

val df2 = sc.parallelize(Seq(
    (1,"B1","B21","B31", "B41"),
    (2,null,null,null,null))
  ).toDF("id", "val1", "val2", "val3", "val4")

val df3 = sc.parallelize(Seq(
    (1,"C1","C2","C3","C4"),
    (2,"C11","C12","C13", "C14"))
  ).toDF("id", "val1", "val2", "val3", "val4")

val consolidated = df1.join(df2, "id").join(df3, "id").select(
  df1("id"),
  coalesce(df1("val1"), df2("val1"), df3("val1")).as("finalVal1"),
  coalesce(df1("val2"), df2("val2"), df3("val2")).as("finalVal2"),
  coalesce(df1("val3"), df2("val3"), df3("val3")).as("finalVal3"),
  coalesce(df1("val4"), df2("val4"), df3("val4")).as("finalVal4")
)

Which gives you the expected output

+---+----+----+----+----+
| id|val1|val2|val3|val4|
+---+----+----+----+----+
|  1|  B1| B21| B31| B41|
|  2|  A2| A21| A31| A41|
+---+----+----+----+----+
like image 195
cheseaux Avatar answered Sep 26 '22 10:09

cheseaux