Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Broadcast Hash Join (BHJ) in Spark for full outer join (outer, full, fullouter)

How do I force full outer join for Dataframes in spark to use Boradcast Hash Join? Here is the code snippet:

sparkConfiguration.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")
val Result = BigTable.join(
  org.apache.spark.sql.functions.broadcast(SmallTable),
  Seq("X", "Y", "Z", "W", "V"),
  "outer"
)

My SmallTable's size is way smaller than the autoBroadcastJoinThreshold as specified above. Also, if I use inner, left_outer, or right_outer join, I see from the DAG visualization that the join is using BroadcastHashJoin as expected.

However, when I use "outer" as the type of join, spark decides to use SortMergeJoin for some unknown reason. Does anyone know how to solve this problem? Based on the performance I see with left outer join, BroadcastHashJoin would help speed up my application multiple-folds.

like image 351
KangarooWest Avatar asked Apr 25 '17 23:04

KangarooWest


2 Answers

spark decides to use SortMergeJoin for some unknown reason. Does anyone know how to solve this problem?

Reason : FullOuter (mean any keyword outer, full, fullouter) wont support Broadcast hash join(aka map side join)

How to prove this ?

lets have one example :

package com.examples

import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/**
  * Join Example and some basics demonstration using sample data.
  *
  * @author : Ram Ghadiyaram
  */
object JoinExamples extends Logging {
  // switch off  un necessary logs
  Logger.getLogger("org").setLevel(Level.OFF)
   val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate;
  case class Person(name: String, age: Int, personid: Int)

  case class Profile(name: String, personId: Int, profileDescription: String)

  /**
    * main
    *
    * @param args Array[String]
    */
  def main(args: Array[String]): Unit = {
    spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
    import spark.implicits._

    spark.sparkContext.getConf.getAllWithPrefix("spark.sql").foreach(x => logInfo(x.toString()))
    /**
      * create 2 dataframes here using case classes one is Person df1 and another one is profile df2
      */
    val df1 = spark.sqlContext.createDataFrame(
      spark.sparkContext.parallelize(
        Person("Sarath", 33, 2)
          :: Person("KangarooWest", 30, 2)
          :: Person("Ravikumar Ramasamy", 34, 5)
          :: Person("Ram Ghadiyaram", 42, 9)
          :: Person("Ravi chandra Kancharla", 43, 9)
          :: Nil))


    val df2 = spark.sqlContext.createDataFrame(
      Profile("Spark", 2, "SparkSQLMaster")
        :: Profile("Spark", 5, "SparkGuru")
        :: Profile("Spark", 9, "DevHunter")
        :: Nil
    )

    // you can do alias to refer column name with aliases to  increase readablity

    val df_asPerson = df1.as("dfperson")
    val df_asProfile = df2.as("dfprofile")
    /** *
      * Example displays how to join them in the dataframe level
      * next example demonstrates using sql with createOrReplaceTempView
      */
    val joined_df = df_asPerson.join(
      broadcast(df_asProfile)
      , col("dfperson.personid") === col("dfprofile.personid")
      , "outer")
    val joined = joined_df.select(
      col("dfperson.name")
      , col("dfperson.age")
      , col("dfprofile.name")
      , col("dfprofile.profileDescription"))
    joined.explain(false) // it will show which join was used
    joined.show

  }

}

I tried to use broadcast hint for fullouter join but framework is ignoring and its taking SortMergeJoin below is the explain plan for this. Result :

== Physical Plan ==
*Project [name#4, age#5, name#11, profileDescription#13]
+- SortMergeJoin [personid#6], [personid#12], FullOuter
   :- *Sort [personid#6 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(personid#6, 200)
   :     +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).name, true) AS name#4, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).age AS age#5, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).personid AS personid#6]
   :        +- Scan ExternalRDDScan[obj#3]
   +- *Sort [personid#12 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(personid#12, 200)
         +- LocalTableScan [name#11, personId#12, profileDescription#13]
+--------------------+---+-----+------------------+
|                name|age| name|profileDescription|
+--------------------+---+-----+------------------+
|  Ravikumar Ramasamy| 34|Spark|         SparkGuru|
|      Ram Ghadiyaram| 42|Spark|         DevHunter|
|Ravi chandra Kanc...| 43|Spark|         DevHunter|
|              Sarath| 33|Spark|    SparkSQLMaster|
|        KangarooWest| 30|Spark|    SparkSQLMaster|
+--------------------+---+-----+------------------+

From spark 2.3 Merge-Sort join is the default join algorithm in spark. However, this can be turned down by using the internal parameter ‘spark.sql.join.preferSortMergeJoin’ which by default is true.

Other case except fullouter join ... If you dont want spark to use sortmergejoin in any case you can set the below property.

sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")

this is instructions to code SparkStrategies.scala (which is responsible & Converts a logical plan into zero or more SparkPlans) that you don't want to use sortmergejoin.

Imp. Note :

This property spark.sql.join.preferSortMergeJoin When true, prefer sort merge join over shuffle hash join by this PREFER_SORTMERGEJOIN property.

Setting false means spark cant only select broadcasthashjoin it can be anything else also(for example shuffle hash join).

The below doc is in SparkStrategies.scala i.e. on top of object JoinSelection extends Strategy with PredicateHelper ...


  • Broadcast: if one side of the join has an estimated physical size that is smaller than the user-configurable [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold or if that side has an explicit broadcast hint (e.g. the user applied the [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame), then that side of the join will be broadcasted and the other side will be streamed, with no shuffling performed. If both sides of the join are eligible to be broadcasted then the
  • Shuffle hash join: if the average size of a single partition is small enough to build a hash table.

  • Sort merge: if the matching join keys are sortable.

like image 179
Ram Ghadiyaram Avatar answered Nov 12 '22 03:11

Ram Ghadiyaram


Broadcast join doesn't support full outer join. It only supports following types:

InnerLike | LeftOuter | LeftSemi | LeftAnti | ExistenceJoin | RightOuter

Please see the JoinStrategy for details.

like image 2
Guitao Avatar answered Nov 12 '22 05:11

Guitao