I am doing a broadcast join of two tables A and B. B is a cached table created with the following Spark SQL:
create table B as select segment_ids_hash from stb_ranker.c3po_segments
where
from_unixtime(unix_timestamp(string(dayid), 'yyyyMMdd')) >= CAST('2019-07-31 00:00:00.000000000' AS TIMESTAMP)
and
segmentid_check('(6|8|10|12|14|371|372|373|374|375|376|582|583|585|586|587|589|591|592|594|596|597|599|601|602|604|606|607|609|610|611|613|615|616)', seg_ids) = true
cache table B
The column 'segment_ids_hash' is of integer type and the result contains 36.4 million records. The cached table size is about 140 MB, as shown below
Then I did the join as follows:
select count(*) from A broadcast join B on A.segment_ids_hash = B.segment_ids_hash
Here broadcast exchange data size is about 3.2 GB.
My question is why the broadcast exchange data size (3.2GB) is so much bigger than the raw data size (~140 MB). What are the overheads? Is there any way to reduce the broadcast exchange data size?
Thanks
When the broadcasted relation is small enough, broadcast joins are fast, as they require minimal data shuffling. Above a certain threshold however, broadcast joins tend to be less reliable or performant than shuffle-based join algorithms, due to bottlenecks in network and memory usage.
Spark also internally maintains a threshold of the table size to automatically apply broadcast joins. The threshold can be configured using spark. sql. autoBroadcastJoinThreshold which is by default 10MB.
BroadcastExchangeExec is a Exchange unary physical operator to collect and broadcast rows of a child relation (to worker nodes).
Broadcast join in spark is preferred when we want to join one small data frame with the large one. the requirement here is we should be able to store the small data frame easily in the memory so that we can join them with the large data frame in order to boost the performance of the join.
Tl; dr: I'm also learning about the source of data size metrics. This one is probably only the estimated size of the operation, it might not reflect the actual size of the data. Don't worry about it too much for now.
Full version:
Update: got back to correct some mistakes. I see that the previous answer was lacking some depth, so I'll try to dig deeper for this as I can (I'm still relatively new to answering question).
Update 2: rephrasing, removed some overly done joke (sry)
Ok, so this thing might be very long but I think this metrics is not really is the direct size of the data.
To begin with, I made a test run for this one to reproduce the results with 200 executors and 4 cores:
This returned this results:
Now I see that something is interesting, since the dataSize for my test is around 1.2GB not 3.2 GB, this led me to read the source code of Spark.
When I go to github, I see that the 4 numbers in BroadcastExchange corresponding to this: First link: BroadcastHashJoinExec: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
Data size corresponding to this one:
I found the relation val here appear to be a HashedRelationBroadcastMode.
Go to HashedRelation https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala:
Since we have Some(Numrows) (it's the number of row of the DF). The match case use case one (line 926:927)
Go back to HashedRelation constructor-y like part:
Since the join is for hashed int, the type is not Long => the join use UnsafeHashedRelation
To UnsafeHashedRelation:
Now we go to the place in UnsafeHashedRelation that determine the estimated size, I found this:
Focus on the estimated size, our target is the binaryMap object (later on the code assign map = binaryMap)
Then it go to here:
binaryMap is a BytestoBytesMap, which corresponding to here https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Jump to getTotalMemoryConsumption method (the one that get estimatedSize), we got:
This is my current deadend at the moment. Just my two cents, I don't think it is a bug but just the estimated size of the join, and since this is an estimated size, I don't really think it has to be very accurate (yeah but it's weird to be honest in this case because the difference is very big).
In case that you want to continue to play with the dataSize on this one. One approach is to directly impact binaryMap object by modifying the input for its constructor. Look back at this:
There are two variables that can be configured, which is MEMORY_OFFHEAP_ENABLED and BUFFER_PAGE size. Perhaps you can try to experiment with those two configuration during spark-submit. That is also the reason why the BroadcastExec size doesn't change even when you changed the number of executors and cores.
So in conclusion, I think data size is an estimation generated by some fascinating mechanism (This one I'm also waiting for someone with more expertise to explain this as I'm digging into it), not directly the size that you have mentioned in the first image (140 MB). As such, it probably not worth to spend much time to reduce the overhead of this particular metrics.
Some bonus related stuff:
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkPlan-BroadcastExchangeExec.html
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-UnsafeRow.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With