I am just trying to understand the query plans generated in SparkSQL (2.4). I have the following query and it's corresponding query plan (below). (The query is just a test query).
create temporary view tab_in as
select distinct
mth_id
from tgt_tbl;
select /*+ BROADCAST(c) */
a.mth_id,
a.qtr_id,
a.prod_id,
a.sale_date
from my_table a
left anti join tab_in c
on a.mth_id = c.mth_id;
Explain plan:
+- *(3) Project [mth_id#652, qtr_id#653, prod_id#655, sale_dt#656]
+- *(3) BroadcastHashJoin [mth_id#652], [mth_id#867], LeftAnti, BuildRight
:- *(3) Project [mth_id#652, qtr_id#653, sale_dt#656, prod_id#655]
: +- *(3) Filescan parquet test_db.my_table[mth_id#652, qtr_id#653, prod_id#655, sale_dt#656] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://test-data/my_table/0], PartitionCount: 1, PartitionFilters: [], PushedFilters: [], ReadSchema ......
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
+- *(2) HashAggregate(keys=[mth_id#867], functions=[], output=[mth_id#867]
+- Exchange hashpartitioning(mth_id#867, 200)
+- *(1) HashAggregate(keys=[mth_id#867], functions=[], output=[mth_id#867])
+- *(1) Project [mth_id#867]
+- *(1) Filter isnotnull(mth_id#867)
+- *(1) FileScan parquet test_db.my_table[mth_id#867] Batched:true, Format: Parquet, Location: InMemoryFileIndex[s3://test-data/tgt_tbl/20200609], PartitionFilters: [], PushedFilters: [IsNotNull(mth_id)], ReadSchema struct<mth_id:int>
As could be seen from above, there are 2 HashAggregates being performed in the plan - 1 before and 1 after the Exchange HashPartitioning. I figured that the first HashAggregate is probably due to the presence of the DISTINCT clause in the first query, but I can not seem to fathom out the reason for the second HashPartitioning (after the Exchange).
I have tried combining both the queries into 1 query by putting the first query in a WITH CTE clause, but still got the same results.
Can someone please explain the need for the second (reading from below) HashAggregate.
Any help is appreciated. Thanks
Both HashAggregates in the plan are because of the deduplication (distinct). The HashAggregate usually comes in a pair. Here the first one is responsible for local deduplication on each executor. After that follows Exchange - the data has to be shuffled and the second HashAggregate is responsible for the final deduplication after the shuffle.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With