I am running below code on spark 3.0.1 (CDP cluster):

The autoBroadcastJoinThreshold is on default 10 MiB and AQE is disabled


On DataFrame.explain() , it selected SortMergeJoin as expected:

If I call DataFrame.show() it goes for SortMergeJoin while DataFrame.count() is going for BroadcastHashJoin.
df.show() --> SortMergeJoin:

df.count() --> BrodcastHashJoin:

I understood that it is because , df.count() uses only the join key column for projection in the first stage and so the size of data from that key column alone is under 10 MiB. So it went for BroadCastHashJoin.

But What I am not understanding is how it does that ?
As I understood spark uses runtime statistics only when AQE is enabled otherwise Catalyst optimizer uses input file size to identify the Join strategy while preparing the physical plan.
Does .count() recalculates the size like this after reading the file at runtime ?
I think that you are right and AQE is not doing anything in here and the magic i happening during initial planning.
In Catalyst statistics are propagated from leaf node to other nodes. In your case the leafs are Json Files Scans.
Lets take a look at them in Spark source code.
Your leaf is going to be evaluated into JsonScan, so its this code: JsonScan in Spark repo
JsonFileScan extends TextBasedFileScan which extends FileScan - and in this class we can find function which is estimating your statistics which are later used during physical planning
Spark source code
override def estimateStatistics(): Statistics = {
new Statistics {
override def sizeInBytes(): OptionalLong = {
val compressionFactor = sparkSession.sessionState.conf.fileCompressionFactor
val size = (compressionFactor * fileIndex.sizeInBytes /
(dataSchema.defaultSize + fileIndex.partitionSchema.defaultSize) *
(readDataSchema.defaultSize + readPartitionSchema.defaultSize)).toLong
OptionalLong.of(size)
}
override def numRows(): OptionalLong = OptionalLong.empty()
}
}
As you can see numRows is empty but sizeInBytes is calculated based on fileIndex and few other variables. One of this variables is readDataSchema.defaultSize. In case of count instead of show Spark is going to read only one column from second file, and that's why statistics are going to be lower and later Catalyst is choosing broadcastHashJoin instead of SMJ - because based on statistics it will be able to broadcast one side of your join.
In source code you can also find how Spark is choosing side which will be broadcasted and how it is comparing size from statistics with spark parameters:
You dont have hint so Spark is going to execute this SparkStrategies.scala:
def createJoinWithoutHint() = {
createBroadcastHashJoin(false)
.orElse(createShuffleHashJoin(false))
.orElse(createSortMergeJoin())
.orElse(createCartesianProduct())
.getOrElse {
// This join could be very slow or OOM
val buildSide = getSmallerSide(left, right)
Seq(joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, j.condition))
}
}
At first it will try to createBroadcast join, here it will check if it can broadcast one side of join.
During this check Spark is going to execute below check first for left side and then for right if left is to big for broadcast Joins.Scala:
def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
val autoBroadcastJoinThreshold = if (plan.stats.isRuntime) {
conf.getConf(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD)
.getOrElse(conf.autoBroadcastJoinThreshold)
} else {
conf.autoBroadcastJoinThreshold
}
plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= autoBroadcastJoinThreshold
}
Here most probably for on side of your join plan.stats.sizeInBytes will be filled with values propagated from JsonFileScan which are smaller than parameter and broadcastHashJoin is going to be used in physical plan
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With