Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does Spark SQL do predicate pushdown on filtered equi-joins?

I am interested in using Spark SQL (1.6) to perform "filtered equi-joins" of the form

A inner join B where A.group_id = B.group_id and pair_filter_udf(A[cols], B[cols])

Here the group_id is coarse: a single value of group_id could be associated with, say, 10,000 records in both A and B.

If the equi-join were performed by itself, without the pair_filter_udf, the coarseness of group_id would create computational issues. For example, for a group_id with 10,000 records in both A and B, there would be 100 million entries in the join. If we had many thousands of such large groups, we would generate an enormous table and we could very easily run out of memory.

Thus, it is essential that we push down pair_filter_udf into the join and have it filter pairs as they are generated, rather than waiting until all pairs have been generated. My question is whether Spark SQL does this.

I set up a simple filtered equi-join and asked Spark what its query plan was:

# run in PySpark Shell
import pyspark.sql.functions as F

sq = sqlContext
n=100
g=10
a = sq.range(n)
a = a.withColumn('grp',F.floor(a['id']/g)*g)
a = a.withColumnRenamed('id','id_a')

b = sq.range(n)
b = b.withColumn('grp',F.floor(b['id']/g)*g)
b = b.withColumnRenamed('id','id_b')

c = a.join(b,(a.grp == b.grp) & (F.abs(a['id_a'] - b['id_b']) < 2)).drop(b['grp'])
c = c.sort('id_a')
c = c[['grp','id_a','id_b']]
c.explain()

Result:

== Physical Plan ==
Sort [id_a#21L ASC], true, 0
+- ConvertToUnsafe
   +- Exchange rangepartitioning(id_a#21L ASC,200), None
      +- ConvertToSafe
         +- Project [grp#20L,id_a#21L,id_b#24L]
            +- Filter (abs((id_a#21L - id_b#24L)) < 2)
               +- SortMergeJoin [grp#20L], [grp#23L]
                  :- Sort [grp#20L ASC], false, 0
                  :  +- TungstenExchange hashpartitioning(grp#20L,200), None
                  :     +- Project [id#19L AS id_a#21L,(FLOOR((cast(id#19L as double) / 10.0)) * 10) AS grp#20L]
                  :        +- Scan ExistingRDD[id#19L] 
                  +- Sort [grp#23L ASC], false, 0
                     +- TungstenExchange hashpartitioning(grp#23L,200), None
                        +- Project [id#22L AS id_b#24L,(FLOOR((cast(id#22L as double) / 10.0)) * 10) AS grp#23L]
                           +- Scan ExistingRDD[id#22L]

These are the key lines from the plan:

+- Filter (abs((id_a#21L - id_b#24L)) < 2)
    +- SortMergeJoin [grp#20L], [grp#23L]

These lines gives the impression that the filter will be done in a separate stage after the join, which is not the desired behavior. But maybe it's being implicitly pushed down into the join, and the query plan just lacks that level of detail.

How can I tell what Spark is doing in this case?

Update:

I'm running experiments with n=1e6 and g=1e5, which should be enough to crash my laptop if Spark is not doing pushdown. Since it is not crashing, I guess it is doing pushdown. But it would be interesting to know how it works and what parts of the Spark SQL source are responsible for this awesome optimization.

like image 592
Paul Avatar asked Mar 27 '16 12:03

Paul


People also ask

How predicate pushdown works Spark?

A predicate push down filters the data in the database query, reducing the number of entries retrieved from the database and improving query performance. By default the Spark Dataset API will automatically push down valid WHERE clauses to the database.

Which file format you will prefer to achieve predicate pushdown?

Parquet formatted files keep some different statistical metrics for each column including the minimum and maximum of their values. Predicate Pushdown helps to skip irrelevant data and deals with the required data.

Does parquet support predicate pushdown?

Parquet allows for predicate pushdown filtering, a form of query pushdown because the file footer stores row-group level metadata for each column in the file.

What is push down filtering?

One way to prevent loading data that is not actually needed is filter pushdown (sometimes also referred to as predicate pushdown), which enables the execution of certain filters at the data source before it is loaded to an executor process.


1 Answers

Quite a lot depends on what you mean by pushdown. If you ask if |a.id_a - b.id_b| < 2 is executed as a part of a join logic next to a.grp = b.grp the answer is negative. Predicates which are not based on equality are not directly included in the join condition.

One way you can illustrate that is to use DAG instead of execution plan It should look more or less like this:

enter image description here

As you can see filter is executed as a separate transformation from the SortMergeJoin. Another approach is to analyze execution plan when you drop a.grp = b.grp. You'll see that it expands join to a Cartesian product followed by a filter with no additional optimizations:

d = a.join(b,(F.abs(a['id_a'] - b['id_b']) < 2)).drop(b['grp'])

## == Physical Plan ==
## Project [id_a#2L,grp#1L,id_b#5L]
## +- Filter (abs((id_a#2L - id_b#5L)) < 2)
##    +- CartesianProduct
##       :- ConvertToSafe
##       :  +- Project [id#0L AS id_a#2L,(FLOOR((cast(id#0L as double) / 10.0)) * 10) AS grp#1L]
##       :     +- Scan ExistingRDD[id#0L] 
##       +- ConvertToSafe
##          +- Project [id#3L AS id_b#5L]
##             +- Scan ExistingRDD[id#3L]

Does it mean your code (not the one with Cartesian - you really want to avoid this in practice) generates a huge intermediate table?

No, it doesn't. Both SortMergeJoin and filter are executed as a single stage (see DAG). While some details of the DataFrame operations can be applied at a slightly lower level it is basically just a chain of the transformations on the Scala Iterators and, as shown in a very illustrative way by Justin Pihony, different operations can be squashed together without adding any Spark-specific logic. One way or another both filters will be applied in a single task.

like image 115
zero323 Avatar answered Oct 16 '22 17:10

zero323