Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does spark predicate pushdown work with JDBC?

According to this

Catalyst applies logical optimizations such as predicate pushdown. The optimizer can push filter predicates down into the data source, enabling the physical execution to skip irrelevant data.

Spark supports push down of predicates to the data source. Is this feature also available / expected for JDBC?

(From inspecting the DB logs I can see it's not the default behavior right now - the full query is passed to the DB, even if it's later limited by spark filters)

MORE DETAILS

Running Spark 1.5 with PostgreSQL 9.4

code snippet:

from pyspark import SQLContext, SparkContext, Row, SparkConf from data_access.data_access_db import REMOTE_CONNECTION  sc = SparkContext() sqlContext = SQLContext(sc)  url = 'jdbc:postgresql://{host}/{database}?user={user}&password={password}'.format(**REMOTE_CONNECTION) sql = "dummy"  df = sqlContext.read.jdbc(url=url, table=sql) df = df.limit(1) df.show() 

SQL Trace:

< 2015-09-15 07:11:37.718 EDT >LOG:  execute <unnamed>: SET extra_float_digits = 3                                                                                                                       < 2015-09-15 07:11:37.771 EDT >LOG:  execute <unnamed>: SELECT * FROM dummy WHERE 1=0                                                                                                                    < 2015-09-15 07:11:37.830 EDT >LOG:  execute <unnamed>: SELECT c.oid, a.attnum, a.attname, c.relname, n.nspname, a.attnotnull OR (t.typtype = 'd' AND t.typnotnull), pg_catalog.pg_get_expr(d.adbin, d.a drelid) LIKE '%nextval(%' FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid) JOIN pg_catalog.pg_attribute a ON (c.oid = a.attrelid) JOIN pg_catalog.pg_type t ON (a.a tttypid = t.oid) LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = a.attrelid AND d.adnum = a.attnum) JOIN (SELECT 15218474 AS oid , 1 AS attnum UNION ALL SELECT 15218474, 3) vals ON (c.oid = vals.oid  AND a.attnum = vals.attnum)                                                                                                                                                                             < 2015-09-15 07:11:40.936 EDT >LOG:  execute <unnamed>: SET extra_float_digits = 3                                                                                                                       < 2015-09-15 07:11:40.964 EDT >LOG:  execute <unnamed>: SELECT "id","name" FROM dummy                                                                                                                    

I would expect that the last select will include a limit 1 clause - but it doesn't

like image 944
Ophir Yoktan Avatar asked Sep 14 '15 21:09

Ophir Yoktan


People also ask

How predicate pushdown works Spark?

A predicate push down filters the data in the database query, reducing the number of entries retrieved from the database and improving query performance. By default the Spark Dataset API will automatically push down valid WHERE clauses to the database.

Does Spark use JDBC?

Spark SQL also includes a data source that can read data from other databases using JDBC. This functionality should be preferred over using JdbcRDD.

What is predicate pushdown?

1. Predicate Pushdown. Predicate Pushdown points to the where or filter clause which affects the number of rows returned. It basically relates to which rows will be filtered, not which columns. For this reason, while applying the filter on a nested column as 'library.


1 Answers

Spark DataFrames support predicate push-down with JDBC sources but term predicate is used in a strict SQL meaning. It means it covers only WHERE clause. Moreover it looks like it is limited to the logical conjunction (no IN and OR I am afraid) and simple predicates.

Everything else, like limits, counts, ordering, groups and conditions is processed on the Spark side. One caveat, already covered on SO, is that df.count() or sqlContext.sql("SELECT COUNT(*) FROM df") is translated to SELECT 1 FROM df and requires both substantial data transfer and processing using Spark.

Does it mean it is a lost cause? Not exactly. It is possible to use an arbitrary subquery as a table argument. It is less convenient than a predicate pushdown but otherwise works pretty well:

n = ... # Number of rows to take sql = "(SELECT * FROM dummy LIMIT {0}) AS tmp".format(int(n)) df = sqlContext.read.jdbc(url=url, table=sql) 

Note:

This behavior may be improved in the future, once Data Source API v2 is ready:

  • SPARK-15689
  • SPIP: Data Source API V2
like image 58
zero323 Avatar answered Oct 16 '22 19:10

zero323