Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is predicate pushdown not used in typed Dataset API (vs untyped DataFrame API)?

I always thought that dataset/dataframe API's are the same.. and the only difference is that dataset API will give you compile time safety. Right ?

So.. I have very simple case:

 case class Player (playerID: String, birthYear: Int)

 val playersDs: Dataset[Player] = session.read
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")
  .csv(PeopleCsv)
  .as[Player]

 // Let's try to find players born in 1999. 
 // This will work, you have compile time safety... but it will not use predicate pushdown!!!
 playersDs.filter(_.birthYear == 1999).explain()

 // This will work as expected and use predicate pushdown!!!
 // But you can't have compile time safety with this :(
 playersDs.filter('birthYear === 1999).explain()

Explain from first example will show that it's NOT doing predicate pushdown (Notice empty PushedFilters):

== Physical Plan ==
*(1) Filter <function1>.apply
+- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

While the second sample will do it correctly (Notice PushedFilters):

== Physical Plan ==
*(1) Project [.....]
+- *(1) Filter (isnotnull(birthYear#11) && (birthYear#11 = 1999))
   +- *(1) FileScan csv [...] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:People.csv], PartitionFilters: [], PushedFilters: [IsNotNull(birthYear), EqualTo(birthYear,1999)], ReadSchema: struct<playerID:string,birthYear:int,birthMonth:int,birthDay:int,birthCountry:string,birthState:s...

So the question is.. how can I use DS Api, and have compile time safety.., and predicate pushdown working as expected ????

Is it possible ? If not.. does this mean that DS api gives you compile time safety.. but at the cost of performance!! ??? (DF will be much faster in this case.. especially when processing large parquet files)

like image 619
Pawel Niezgoda Avatar asked May 02 '18 07:05

Pawel Niezgoda


People also ask

What is predicate pushdown?

A predicate push down filters the data in the database query, reducing the number of entries retrieved from the database and improving query performance. By default the Spark Dataset API will automatically push down valid WHERE clauses to the database.

Does parquet support predicate pushdown?

Parquet allows for predicate pushdown filtering, a form of query pushdown because the file footer stores row-group level metadata for each column in the file.

Does Avro support predicate pushdown?

Predicate pushdown is a data processing technique taking user-defined filters and executing them while reading the data. Apache Spark already supported it for Apache Parquet and RDBMS. Starting from Apache Spark 3.1. 1, you can also use them for Apache Avro, JSON and CSV formats!

Which option can be used in spark SQL if you need to use an in memory columnar structure to cache tables?

Spark SQL can cache tables using an in-memory columnar format by calling spark. catalog. cacheTable("tableName") or dataFrame. cache() .


1 Answers

That's the line in your Physical Plan you should remember to know the real difference between Dataset[T] and DataFrame (which is Dataset[Row]).

Filter <function1>.apply

I keep saying that people should stay away from the typed Dataset API and keep using the untyped DataFrame API as the Scala code becomes a black box to the optimizer in too many places. You've just hit one of these and think also about the deserialization of all the objects that Spark SQL keeps away from JVM to avoid GCs. Every time you touch the objects you literally ask Spark SQL to deserialize objects and load them onto JVM that puts a lot of pressure on GC (which will get triggered more often with the typed Dataset API as compared to the untyped DataFrame API).

See UDFs are Blackbox — Don’t Use Them Unless You’ve Got No Choice.


Quoting Reynold Xin after I asked the very same question on [email protected] mailing list:

The UDF is a black box so Spark can't know what it is dealing with. There are simple cases in which we can analyze the UDFs byte code and infer what it is doing, but it is pretty difficult to do in general.

There is a JIRA ticket for such cases SPARK-14083 Analyze JVM bytecode and turn closures into Catalyst expressions, but as someone said (I think it was Adam B. on twitter) it'd be a kind of joke to expect it any time soon.

One big advantage of the Dataset API is the type safety, at the cost of performance due to heavy reliance on user-defined closures/lambdas. These closures are typically slower than expressions because we have more flexibility to optimize expressions (known data types, no virtual function calls, etc). In many cases, it's actually not going to be very difficult to look into the byte code of these closures and figure out what they are trying to do. If we can understand them, then we can turn them directly into Catalyst expressions for more optimized executions.


// Let's try to find players born in 1999. 
// This will work, you have compile time safety... but it will not use predicate pushdown!!!
playersDs.filter(_.birthYear == 1999).explain()

The above code is equivalent to the following:

val someCodeSparkSQLCannotDoMuchOutOfIt = (p: Player) => p.birthYear == 1999
playersDs.filter(someCodeSparkSQLCannotDoMuchOutOfIt).explain()

someCodeSparkSQLCannotDoMuchOutOfIt is exactly where you put optimizations aside and let Spark Optimizer skip it.

like image 74
Jacek Laskowski Avatar answered Oct 11 '22 02:10

Jacek Laskowski