Configuration:
Spark version: 3.1.2
spark.dynamicAllocation.enabled = true
spark.sql.adaptive.enabled = true
spark.sql.autoBroadcastJoinThreshold = 10MB
spark.sql.broadcastTimeout = 300
...
Code:
SELECT /*+ MERGE(a, b) */ a.memberId, b.name, a.amount
FROM trade AS a
JOIN member AS b ON a.memberId = b.id
WHERE b.isTrueMember = true;
At first, we don't use the join hint /*+ MERGE(a, b) */. The size of trade less than 10MB, then BroadcastHashJoin will be selected, this is correct and in line with expectations.
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [memberId#9554, name#9630, amount#9532]
+- BroadcastHashJoin [memberId#9554], [id#9589], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [id=#34501]
: +- Filter isnotnull(memberId#9554)
: +- FileScan parquet xxx.trade[amount#9532,memberId#9554] Batched: true, DataFilters: [isnotnull(memberId#9554)], Format: Parquet, Location: InMemoryFileIndex[xxx://xxx;xxx;xxx/..., PartitionFilters: [], PushedFilters: [IsNotNull(memberId)], ReadSchema: struct
+- Project [id#9589, name#9630]
+- Filter ((isnotnull(isTrueMember#9608) AND (isTrueMember#9608 = true)) AND isnotnull(id#9589))
+- FileScan parquet xxx.member[id#9589,isTrueMember#9608,name#9630] Batched: true, DataFilters: [isnotnull(isTrueMember#9608), (isTrueMember#9608 = true), isnotnull(id#9589)], Format: Parquet, Location: InMemoryFileIndex[xxx://xxx;xxx;xxx/..., PartitionFilters: [], PushedFilters: [IsNotNull(isTrueMember), EqualTo(isTrueMember,true), IsNotNull(id)], ReadSchema: struct
In my workflow, there are many table codes that will be executed after this section of SQL, but if this section of SQL reports an error, the following tasks will not be executed.
The current phenomenon is that sometimes this code will report a broadcast timeout error, but sometimes it will not. It is very unstable, stability is very important in our system, so I added /*+ MERGE(a, b) */ hint in order to avoid BroadcastHashJoin.
My questions:
If you need more information, please leave a message in the comment area.
More info:
cluster manager: Kubernetes
cluster located: Aliyun
data located: The data will be cached in Alluxio and asynchronously synchronized to Aliyun OSS
driver and executor config:
spark.driver.memory: 4g
spark.driver.memoryOverhead: 4g
spark.executor.cores: 6
spark.executor.memory: 23g
spark.executor.memoryOverhead: 5g
spark.executor.instances: 2
trade data is just one parquet file in Alluxio, no partitions, and size is 3.2MB.
We use the auto scaling feature of Kubernetes, which generally takes 2-3 minutes to take effect.
Thank you in advance!
Might be related to this.
Essentially, there's an issue in Spark 3.0 with the adaptive query execution engine where the map and broadcast are being submitted at the same time and the map takes all of the resources, slowing down the broadcast. This isn't resolved until Spark 3.2 and it looks like you're running 3.1. I had a similar issue with a few jobs.
Assuming this is the issue, there are a few things you can do:
spark.sql.adaptive.enabled so Spark isn't using the newer AQE - this is also a great test to see if this is causing the issue.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With