Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark

I execute a join using a javaHiveContext in Spark.

The big table is 1,76Gb and has 100 millions record.

The second table is 273Mb and has 10 millions record.

I get a JavaSchemaRDD and I call count() on it:

String query="select attribute7,count(*) from ft,dt where ft.chiavedt=dt.chiavedt group by attribute7";

JavaSchemaRDD rdd=sqlContext.sql(query);

System.out.println("count="+rdd.count());

If I force a broadcastHashJoin (SET spark.sql.autoBroadcastJoinThreshold=290000000) and use 5 executor on 5 node with 8 core and 20Gb of memory it is executed in 100 sec. If i don't force broadcast it is executed in 30 sec.

N.B. the tables are stored as Parquet file.

like image 509
Fabio Avatar asked Dec 07 '15 16:12

Fabio


People also ask

Why are broadcast joins significantly faster than shuffle joins?

Broadcast joins are faster than shuffle joins because they do not require an expensive shuffle step, instead passing the full RDD to each executor as a broadcast variable.

How does shuffle hash join work in Spark?

Shuffle Hash Join, as the name indicates works by shuffling both datasets. So the same keys from both sides end up in the same partition or task. Once the data is shuffled, the smallest of the two will be hashed into buckets and a hash join is performed within the partition.

How do you reduce shuffling in performing a join of two Spark datasets?

One way to avoid shuffles when joining two datasets is to take advantage of broadcast variables. When one of the datasets is small enough to fit in memory in a single executor, it can be loaded into a hash table on the driver and then broadcast to every executor.


1 Answers

Most likely the source of the problem is a cost of broadcasting. To make things simple lets assume that you have 1800MB in the larger RDD and 300MB in the smaller one. Assuming 5 executors and no previous partitioning a fifth of all data should be already on the correct machine. It lefts ~1700MB for shuffling in case of standard join.

For broadcast join the smaller RDD has to be transfered to all nodes. It means around 1500MB data to be transfered. If you add required communication with driver it means you have to move a comparable amount of data in a much more expensive way. A broadcasted data has to be collected first and only after that can be forwarded to all the workers.

like image 76
zero323 Avatar answered Oct 28 '22 04:10

zero323