Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does the broadcast timeout still occur, although we set the threshold very low?

Configuration:

Spark version: 3.1.2
spark.dynamicAllocation.enabled = true
spark.sql.adaptive.enabled = true
spark.sql.autoBroadcastJoinThreshold = 10MB
spark.sql.broadcastTimeout = 300
...

Code:

SELECT /*+ MERGE(a, b) */ a.memberId, b.name, a.amount
FROM trade AS a
JOIN member AS b ON a.memberId = b.id
WHERE b.isTrueMember = true;

At first, we don't use the join hint /*+ MERGE(a, b) */. The size of trade less than 10MB, then BroadcastHashJoin will be selected, this is correct and in line with expectations.

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [memberId#9554, name#9630, amount#9532]
   +- BroadcastHashJoin [memberId#9554], [id#9589], Inner, BuildLeft, false
      :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [id=#34501]
      :  +- Filter isnotnull(memberId#9554)
      :     +- FileScan parquet xxx.trade[amount#9532,memberId#9554] Batched: true, DataFilters: [isnotnull(memberId#9554)], Format: Parquet, Location: InMemoryFileIndex[xxx://xxx;xxx;xxx/..., PartitionFilters: [], PushedFilters: [IsNotNull(memberId)], ReadSchema: struct
      +- Project [id#9589, name#9630]
         +- Filter ((isnotnull(isTrueMember#9608) AND (isTrueMember#9608 = true)) AND isnotnull(id#9589))
            +- FileScan parquet xxx.member[id#9589,isTrueMember#9608,name#9630] Batched: true, DataFilters: [isnotnull(isTrueMember#9608), (isTrueMember#9608 = true), isnotnull(id#9589)], Format: Parquet, Location: InMemoryFileIndex[xxx://xxx;xxx;xxx/..., PartitionFilters: [], PushedFilters: [IsNotNull(isTrueMember), EqualTo(isTrueMember,true), IsNotNull(id)], ReadSchema: struct

In my workflow, there are many table codes that will be executed after this section of SQL, but if this section of SQL reports an error, the following tasks will not be executed.

The current phenomenon is that sometimes this code will report a broadcast timeout error, but sometimes it will not. It is very unstable, stability is very important in our system, so I added /*+ MERGE(a, b) */ hint in order to avoid BroadcastHashJoin.

My questions:

  • Is BroadcastHashJoin also unstable due to network's instability?
  • 10MB looks very small, Why does the broadcast timeout still occur sometimes?

If you need more information, please leave a message in the comment area.

More info:

cluster manager: Kubernetes
cluster located: Aliyun
data located: The data will be cached in Alluxio and asynchronously synchronized to Aliyun OSS

driver and executor config:
    spark.driver.memory: 4g
    spark.driver.memoryOverhead: 4g
    spark.executor.cores: 6
    spark.executor.memory: 23g
    spark.executor.memoryOverhead: 5g
    spark.executor.instances: 2

trade data is just one parquet file in Alluxio, no partitions, and size is 3.2MB.

We use the auto scaling feature of Kubernetes, which generally takes 2-3 minutes to take effect.

Thank you in advance!

like image 282
Guoran Yun Avatar asked Oct 17 '25 01:10

Guoran Yun


1 Answers

Might be related to this.

Essentially, there's an issue in Spark 3.0 with the adaptive query execution engine where the map and broadcast are being submitted at the same time and the map takes all of the resources, slowing down the broadcast. This isn't resolved until Spark 3.2 and it looks like you're running 3.1. I had a similar issue with a few jobs.

Assuming this is the issue, there are a few things you can do:

  1. Upgrade to Spark 3.2 (might not be realistic)
  2. Increase the broadcast timeout, perhaps significantly if the code is straightforward and you're sure it's just this section with the broadcast join. If you do this and it stabilizes while being faster than the merge join version, then it's probably just from this issue.
  3. Disable spark.sql.adaptive.enabled so Spark isn't using the newer AQE - this is also a great test to see if this is causing the issue.
like image 77
Jeremy Avatar answered Oct 19 '25 13:10

Jeremy