How do I force full outer join for Dataframes in spark to use Boradcast Hash Join? Here is the code snippet:
sparkConfiguration.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")
val Result = BigTable.join(
org.apache.spark.sql.functions.broadcast(SmallTable),
Seq("X", "Y", "Z", "W", "V"),
"outer"
)
My SmallTable's size is way smaller than the autoBroadcastJoinThreshold
as specified above. Also, if I use inner, left_outer
, or right_outer
join, I see from the DAG visualization that the join is using BroadcastHashJoin
as expected.
However, when I use "outer
" as the type of join, spark decides to use SortMergeJoin
for some unknown reason. Does anyone know how to solve this problem? Based on the performance I see with left outer join, BroadcastHashJoin
would help speed up my application multiple-folds.
spark decides to use SortMergeJoin for some unknown reason. Does anyone know how to solve this problem?
Reason : FullOuter (mean any keyword outer
, full
, fullouter
) wont support Broadcast hash join(aka map side join)
How to prove this ?
lets have one example :
package com.examples import org.apache.log4j.{Level, Logger} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ /** * Join Example and some basics demonstration using sample data. * * @author : Ram Ghadiyaram */ object JoinExamples extends Logging { // switch off un necessary logs Logger.getLogger("org").setLevel(Level.OFF) val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate; case class Person(name: String, age: Int, personid: Int) case class Profile(name: String, personId: Int, profileDescription: String) /** * main * * @param args Array[String] */ def main(args: Array[String]): Unit = { spark.conf.set("spark.sql.join.preferSortMergeJoin", "false") import spark.implicits._ spark.sparkContext.getConf.getAllWithPrefix("spark.sql").foreach(x => logInfo(x.toString())) /** * create 2 dataframes here using case classes one is Person df1 and another one is profile df2 */ val df1 = spark.sqlContext.createDataFrame( spark.sparkContext.parallelize( Person("Sarath", 33, 2) :: Person("KangarooWest", 30, 2) :: Person("Ravikumar Ramasamy", 34, 5) :: Person("Ram Ghadiyaram", 42, 9) :: Person("Ravi chandra Kancharla", 43, 9) :: Nil)) val df2 = spark.sqlContext.createDataFrame( Profile("Spark", 2, "SparkSQLMaster") :: Profile("Spark", 5, "SparkGuru") :: Profile("Spark", 9, "DevHunter") :: Nil ) // you can do alias to refer column name with aliases to increase readablity val df_asPerson = df1.as("dfperson") val df_asProfile = df2.as("dfprofile") /** * * Example displays how to join them in the dataframe level * next example demonstrates using sql with createOrReplaceTempView */ val joined_df = df_asPerson.join( broadcast(df_asProfile) , col("dfperson.personid") === col("dfprofile.personid") , "outer") val joined = joined_df.select( col("dfperson.name") , col("dfperson.age") , col("dfprofile.name") , col("dfprofile.profileDescription")) joined.explain(false) // it will show which join was used joined.show } }
I tried to use broadcast hint for fullouter
join but framework is ignoring and its taking SortMergeJoin
below is the explain plan for this.
Result :
== Physical Plan == *Project [name#4, age#5, name#11, profileDescription#13] +- SortMergeJoin [personid#6], [personid#12], FullOuter :- *Sort [personid#6 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(personid#6, 200) : +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).name, true) AS name#4, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).age AS age#5, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).personid AS personid#6] : +- Scan ExternalRDDScan[obj#3] +- *Sort [personid#12 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(personid#12, 200) +- LocalTableScan [name#11, personId#12, profileDescription#13] +--------------------+---+-----+------------------+ | name|age| name|profileDescription| +--------------------+---+-----+------------------+ | Ravikumar Ramasamy| 34|Spark| SparkGuru| | Ram Ghadiyaram| 42|Spark| DevHunter| |Ravi chandra Kanc...| 43|Spark| DevHunter| | Sarath| 33|Spark| SparkSQLMaster| | KangarooWest| 30|Spark| SparkSQLMaster| +--------------------+---+-----+------------------+
From spark 2.3 Merge-Sort join is the default join algorithm in spark. However, this can be turned down by using the internal parameter ‘spark.sql.join.preferSortMergeJoin’ which by default is true.
Other case except fullouter
join ... If you dont want spark to use sortmergejoin in any case you can set the below property.
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
this is instructions to code SparkStrategies.scala
(which is responsible & Converts a logical plan into zero or more SparkPlans) that you don't want to use sortmergejoin
.
This property spark.sql.join.preferSortMergeJoin
When true, prefer sort merge join over shuffle hash join by this PREFER_SORTMERGEJOIN property.
Setting false
means spark cant only select broadcasthashjoin it can be anything else also(for example shuffle hash join).
The below doc is in SparkStrategies.scala
i.e. on top of object JoinSelection extends Strategy with PredicateHelper ...
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD
]] threshold
or if that side has an explicit broadcast hint (e.g. the user applied the
[[org.apache.spark.sql.functions.broadcast()
]] function to a DataFrame
), then that side
of the join will be broadcasted and the other side will be streamed, with no shuffling
performed. If both sides of the join are eligible to be broadcasted then theShuffle hash join: if the average size of a single partition is small enough to build a hash table.
Sort merge: if the matching join keys are sortable.
Broadcast join doesn't support full outer join. It only supports following types:
InnerLike | LeftOuter | LeftSemi | LeftAnti | ExistenceJoin | RightOuter
Please see the JoinStrategy for details.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With