Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

HashAggregate in SparkSQL Query Plan

I am just trying to understand the query plans generated in SparkSQL (2.4). I have the following query and it's corresponding query plan (below). (The query is just a test query).

create temporary view tab_in as
select distinct 
       mth_id 
from tgt_tbl;

select /*+ BROADCAST(c) */
a.mth_id,
a.qtr_id,
a.prod_id,
a.sale_date
from my_table a
left anti join tab_in c
on a.mth_id = c.mth_id;

Explain plan:

+- *(3) Project [mth_id#652, qtr_id#653, prod_id#655, sale_dt#656]
   +- *(3) BroadcastHashJoin [mth_id#652], [mth_id#867], LeftAnti, BuildRight
      :- *(3) Project [mth_id#652, qtr_id#653, sale_dt#656, prod_id#655]
      :  +- *(3) Filescan parquet test_db.my_table[mth_id#652, qtr_id#653, prod_id#655, sale_dt#656] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://test-data/my_table/0], PartitionCount: 1, PartitionFilters: [], PushedFilters: [], ReadSchema ......
      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
         +- *(2) HashAggregate(keys=[mth_id#867], functions=[], output=[mth_id#867]
            +- Exchange hashpartitioning(mth_id#867, 200)
               +- *(1) HashAggregate(keys=[mth_id#867], functions=[], output=[mth_id#867])
                  +- *(1) Project [mth_id#867]
                     +- *(1) Filter isnotnull(mth_id#867)
                        +- *(1) FileScan parquet test_db.my_table[mth_id#867] Batched:true, Format:  Parquet, Location: InMemoryFileIndex[s3://test-data/tgt_tbl/20200609], PartitionFilters: [], PushedFilters: [IsNotNull(mth_id)], ReadSchema struct<mth_id:int>

As could be seen from above, there are 2 HashAggregates being performed in the plan - 1 before and 1 after the Exchange HashPartitioning. I figured that the first HashAggregate is probably due to the presence of the DISTINCT clause in the first query, but I can not seem to fathom out the reason for the second HashPartitioning (after the Exchange).

I have tried combining both the queries into 1 query by putting the first query in a WITH CTE clause, but still got the same results.

Can someone please explain the need for the second (reading from below) HashAggregate.

Any help is appreciated. Thanks

like image 893
marie20 Avatar asked Oct 27 '25 07:10

marie20


1 Answers

Both HashAggregates in the plan are because of the deduplication (distinct). The HashAggregate usually comes in a pair. Here the first one is responsible for local deduplication on each executor. After that follows Exchange - the data has to be shuffled and the second HashAggregate is responsible for the final deduplication after the shuffle.

like image 192
David Vrba Avatar answered Oct 29 '25 10:10

David Vrba



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!