I have constructed two dataframes. How can we join multiple Spark dataframes ?
For Example :
PersonDf
, ProfileDf
with a common column as personId
as (key). Now how can we have one Dataframe combining PersonDf
and ProfileDf
?
You can join two datasets using the join operators with an optional join condition. You can also use SQL mode to join datasets using good ol' SQL. You can specify a join condition (aka join expression) as part of join operators or using where or filter operators.
You can use case class to prepare sample dataset ... which is optional for ex: you can get DataFrame
from hiveContext.sql
as well..
import org.apache.spark.sql.functions.col case class Person(name: String, age: Int, personid : Int) case class Profile(name: String, personid : Int , profileDescription: String) val df1 = sqlContext.createDataFrame( Person("Bindu",20, 2) :: Person("Raphel",25, 5) :: Person("Ram",40, 9):: Nil) val df2 = sqlContext.createDataFrame( Profile("Spark",2, "SparkSQLMaster") :: Profile("Spark",5, "SparkGuru") :: Profile("Spark",9, "DevHunter"):: Nil ) // you can do alias to refer column name with aliases to increase readablity val df_asPerson = df1.as("dfperson") val df_asProfile = df2.as("dfprofile") val joined_df = df_asPerson.join( df_asProfile , col("dfperson.personid") === col("dfprofile.personid") , "inner") joined_df.select( col("dfperson.name") , col("dfperson.age") , col("dfprofile.name") , col("dfprofile.profileDescription")) .show
sample Temp table approach which I don't like personally...
The reason to use the registerTempTable( tableName )
method for a DataFrame, is so that in addition to being able to use the Spark-provided methods of a DataFrame, you can also issue SQL queries via the sqlContext.sql( sqlQuery )
method, that use that DataFrame as an SQL table. The tableName parameter specifies the table name to use for that DataFrame in the SQL queries.
df_asPerson.registerTempTable("dfperson"); df_asProfile.registerTempTable("dfprofile") sqlContext.sql("""SELECT dfperson.name, dfperson.age, dfprofile.profileDescription FROM dfperson JOIN dfprofile ON dfperson.personid == dfprofile.personid""")
Note : 1) As mentioned by @RaphaelRoth ,
val resultDf = PersonDf.join(ProfileDf,Seq("personId"))
is good approach since it doesnt have duplicate columns from both sides if you are using inner join with same table.
2) Spark 2.x example updated in another answer with full set of join operations supported by spark 2.x with examples + result
Also, important thing in joins : broadcast function can help to give hint please see my answer
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With