Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I convince spark not to make an exchange when the join key is a super-set of the bucketBy key?

While testing for a production use-case I have created and saved (using Hive Metastore) such tables:

table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets

table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets

I’m running such a query (in pseudocode)

table1.join(table2, [“key1”, “key2”])
 .groupBy(“value2”)
 .countUnique(“key1”)

Common sense says that this join should simply be done with a sort-merge join with no exchange; however spark does an exchange then join.

Even though for this particular use-case, I could have bucketed by both keys, due to some other use-cases I need to bucket by key1. And when I do a (simpler) join using a single key like this:

table1.join(table2, [“key1”])

It works as expected (i.e. sort-merge join with no exchange).

Now that I have an optimized join on these table, if I want to filter, as such:

table1.join(table2, [“key1”])
 .filter(table1.col(“key2”) == table2.col(“key2”))

It reverts back to the exchange then join.

How can I convince spark not to make an exchange when the join key is a super-set of the bucketBy key?

Note:

One trick I know is instead of an equality check if I would rewrite as inequality checks, spark would not shuffle.

(x == y) can also be expressed as ((x >= y) & ( x <= y)). If I would apply two filters like this in the last example:

.filter(table1.col(“key2”) >= table2.col(“key2”))

.filter(table1.col(“key2”) <= table2.col(“key2”))

It will continue using sort-merge join with no exchange, however this is not a solution, this is a hack.

like image 688
zetaprime Avatar asked Jun 25 '19 23:06

zetaprime


1 Answers

Based on some research and exploration this seems to be the least hacky solution:

Building on this example:

table1.join(table2, [“key1”])
      .filter(table1.col(“key2”) == table2.col(“key2”))

Instead of using the equalTo (==) from Spark, implementing a custom MyEqualTo (by delegating to the the spark EqualTo implementation is fine) seems to solve the issue. This way, spark won't optimize(!) the join, and it will just pull the filter up into SortMergeJoin.

Similarly, the join condition can be also formed as such:

(table1.col(“key1”) == table2.col(“key1”)) AND
table1.col(“key2”).myEqualTo(table2.col(“key2”))
like image 124
zetaprime Avatar answered Sep 26 '22 04:09

zetaprime