According to this
Catalyst applies logical optimizations such as predicate pushdown. The optimizer can push filter predicates down into the data source, enabling the physical execution to skip irrelevant data.
Spark supports push down of predicates to the data source. Is this feature also available / expected for JDBC?
(From inspecting the DB logs I can see it's not the default behavior right now - the full query is passed to the DB, even if it's later limited by spark filters)
MORE DETAILS
Running Spark 1.5 with PostgreSQL 9.4
code snippet:
from pyspark import SQLContext, SparkContext, Row, SparkConf from data_access.data_access_db import REMOTE_CONNECTION sc = SparkContext() sqlContext = SQLContext(sc) url = 'jdbc:postgresql://{host}/{database}?user={user}&password={password}'.format(**REMOTE_CONNECTION) sql = "dummy" df = sqlContext.read.jdbc(url=url, table=sql) df = df.limit(1) df.show()
SQL Trace:
< 2015-09-15 07:11:37.718 EDT >LOG: execute <unnamed>: SET extra_float_digits = 3 < 2015-09-15 07:11:37.771 EDT >LOG: execute <unnamed>: SELECT * FROM dummy WHERE 1=0 < 2015-09-15 07:11:37.830 EDT >LOG: execute <unnamed>: SELECT c.oid, a.attnum, a.attname, c.relname, n.nspname, a.attnotnull OR (t.typtype = 'd' AND t.typnotnull), pg_catalog.pg_get_expr(d.adbin, d.a drelid) LIKE '%nextval(%' FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON (c.relnamespace = n.oid) JOIN pg_catalog.pg_attribute a ON (c.oid = a.attrelid) JOIN pg_catalog.pg_type t ON (a.a tttypid = t.oid) LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = a.attrelid AND d.adnum = a.attnum) JOIN (SELECT 15218474 AS oid , 1 AS attnum UNION ALL SELECT 15218474, 3) vals ON (c.oid = vals.oid AND a.attnum = vals.attnum) < 2015-09-15 07:11:40.936 EDT >LOG: execute <unnamed>: SET extra_float_digits = 3 < 2015-09-15 07:11:40.964 EDT >LOG: execute <unnamed>: SELECT "id","name" FROM dummy
I would expect that the last select will include a limit 1
clause - but it doesn't
A predicate push down filters the data in the database query, reducing the number of entries retrieved from the database and improving query performance. By default the Spark Dataset API will automatically push down valid WHERE clauses to the database.
Spark SQL also includes a data source that can read data from other databases using JDBC. This functionality should be preferred over using JdbcRDD.
1. Predicate Pushdown. Predicate Pushdown points to the where or filter clause which affects the number of rows returned. It basically relates to which rows will be filtered, not which columns. For this reason, while applying the filter on a nested column as 'library.
Spark DataFrames support predicate push-down with JDBC sources but term predicate is used in a strict SQL meaning. It means it covers only WHERE
clause. Moreover it looks like it is limited to the logical conjunction (no IN
and OR
I am afraid) and simple predicates.
Everything else, like limits, counts, ordering, groups and conditions is processed on the Spark side. One caveat, already covered on SO, is that df.count()
or sqlContext.sql("SELECT COUNT(*) FROM df")
is translated to SELECT 1 FROM df
and requires both substantial data transfer and processing using Spark.
Does it mean it is a lost cause? Not exactly. It is possible to use an arbitrary subquery as a table
argument. It is less convenient than a predicate pushdown but otherwise works pretty well:
n = ... # Number of rows to take sql = "(SELECT * FROM dummy LIMIT {0}) AS tmp".format(int(n)) df = sqlContext.read.jdbc(url=url, table=sql)
Note:
This behavior may be improved in the future, once Data Source API v2 is ready:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With