Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is Spark broadcast exchange data size bigger than raw size on join?

I am doing a broadcast join of two tables A and B. B is a cached table created with the following Spark SQL:

create table B as select segment_ids_hash from  stb_ranker.c3po_segments
      where
        from_unixtime(unix_timestamp(string(dayid), 'yyyyMMdd')) >= CAST('2019-07-31 00:00:00.000000000' AS TIMESTAMP)
      and
        segmentid_check('(6|8|10|12|14|371|372|373|374|375|376|582|583|585|586|587|589|591|592|594|596|597|599|601|602|604|606|607|609|610|611|613|615|616)', seg_ids) = true
cache table B

The column 'segment_ids_hash' is of integer type and the result contains 36.4 million records. The cached table size is about 140 MB, as shown below enter image description here

Then I did the join as follows:

select count(*) from A broadcast join B on A.segment_ids_hash = B.segment_ids_hash

enter image description here

Here broadcast exchange data size is about 3.2 GB.

My question is why the broadcast exchange data size (3.2GB) is so much bigger than the raw data size (~140 MB). What are the overheads? Is there any way to reduce the broadcast exchange data size?

Thanks

like image 205
seiya Avatar asked Sep 20 '19 19:09

seiya


People also ask

Why are broadcast joins significantly faster than shuffle joins?

When the broadcasted relation is small enough, broadcast joins are fast, as they require minimal data shuffling. Above a certain threshold however, broadcast joins tend to be less reliable or performant than shuffle-based join algorithms, due to bottlenecks in network and memory usage.

What is the threshold size limit for broadcast join in spark?

Spark also internally maintains a threshold of the table size to automatically apply broadcast joins. The threshold can be configured using spark. sql. autoBroadcastJoinThreshold which is by default 10MB.

What is broadcast Exchange in spark?

BroadcastExchangeExec is a Exchange unary physical operator to collect and broadcast rows of a child relation (to worker nodes).

When should I broadcast join spark?

Broadcast join in spark is preferred when we want to join one small data frame with the large one. the requirement here is we should be able to store the small data frame easily in the memory so that we can join them with the large data frame in order to boost the performance of the join.


1 Answers

Tl; dr: I'm also learning about the source of data size metrics. This one is probably only the estimated size of the operation, it might not reflect the actual size of the data. Don't worry about it too much for now.

Full version:

Update: got back to correct some mistakes. I see that the previous answer was lacking some depth, so I'll try to dig deeper for this as I can (I'm still relatively new to answering question).

Update 2: rephrasing, removed some overly done joke (sry)

Ok, so this thing might be very long but I think this metrics is not really is the direct size of the data.

To begin with, I made a test run for this one to reproduce the results with 200 executors and 4 cores:

enter image description here

This returned this results: enter image description here

Now I see that something is interesting, since the dataSize for my test is around 1.2GB not 3.2 GB, this led me to read the source code of Spark.

When I go to github, I see that the 4 numbers in BroadcastExchange corresponding to this: First link: BroadcastHashJoinExec: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala enter image description here

Data size corresponding to this one:

enter image description here I found the relation val here appear to be a HashedRelationBroadcastMode.

Go to HashedRelation https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala: enter image description here

Since we have Some(Numrows) (it's the number of row of the DF). The match case use case one (line 926:927)

Go back to HashedRelation constructor-y like part: enter image description here

Since the join is for hashed int, the type is not Long => the join use UnsafeHashedRelation

To UnsafeHashedRelation:

enter image description here

Now we go to the place in UnsafeHashedRelation that determine the estimated size, I found this:

enter image description here

Focus on the estimated size, our target is the binaryMap object (later on the code assign map = binaryMap)

Then it go to here: enter image description here

binaryMap is a BytestoBytesMap, which corresponding to here https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Jump to getTotalMemoryConsumption method (the one that get estimatedSize), we got:

enter image description here

enter image description here

This is my current deadend at the moment. Just my two cents, I don't think it is a bug but just the estimated size of the join, and since this is an estimated size, I don't really think it has to be very accurate (yeah but it's weird to be honest in this case because the difference is very big).

In case that you want to continue to play with the dataSize on this one. One approach is to directly impact binaryMap object by modifying the input for its constructor. Look back at this:

enter image description here

There are two variables that can be configured, which is MEMORY_OFFHEAP_ENABLED and BUFFER_PAGE size. Perhaps you can try to experiment with those two configuration during spark-submit. That is also the reason why the BroadcastExec size doesn't change even when you changed the number of executors and cores.

So in conclusion, I think data size is an estimation generated by some fascinating mechanism (This one I'm also waiting for someone with more expertise to explain this as I'm digging into it), not directly the size that you have mentioned in the first image (140 MB). As such, it probably not worth to spend much time to reduce the overhead of this particular metrics.

Some bonus related stuff:

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkPlan-BroadcastExchangeExec.html

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-UnsafeRow.html

like image 68
Long Vu Avatar answered Oct 12 '22 10:10

Long Vu