I am assessing the performance of different ways of loading Parquet files in Spark and the differences are staggering.
In our Parquet files, we have nested case classes of the type:
case class C(/* a dozen of attributes*/)
case class B(/* a dozen of attributes*/, cs: Seq[C])
case class A(/* a dozen of attributes*/, bs: Seq[B])
It takes a while to load them from Parquet files. So I've done a benchmark of different ways of loading case classes from Parquet files and summing a field using Spark 1.6 and 2.0.
Here is a summary of the benchmark I did:
val df: DataFrame = sqlContext.read.parquet("path/to/file.gz.parquet").persist()
df.count()
// Spark 1.6
// Play Json
// 63.169s
df.toJSON.flatMap(s => Try(Json.parse(s).as[A]).toOption)
.map(_.fieldToSum).sum()
// Direct access to field using Spark Row
// 2.811s
df.map(row => row.getAs[Long]("fieldToSum")).sum()
// Some small library we developed that access fields using Spark Row
// 10.401s
df.toRDD[A].map(_.fieldToSum).sum()
// Dataframe hybrid SQL API
// 0.239s
df.agg(sum("fieldToSum")).collect().head.getAs[Long](0)
// Dataset with RDD-style code
// 34.223s
df.as[A].map(_.fieldToSum).reduce(_ + _)
// Dataset with column selection
// 0.176s
df.as[A].select($"fieldToSum".as[Long]).reduce(_ + _)
// Spark 2.0
// Performance is similar except for:
// Direct access to field using Spark Row
// 23.168s
df.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _)
// Some small library we developed that access fields using Spark Row
// 32.898s
f1DF.toRDD[A].map(_.fieldToSum).sum()
I understand why the performance of methods using Spark Row is degraded when upgrading to Spark 2.0, since Dataframe
is now a mere alias of Dataset[Row]
.
That's the cost of unifying the interfaces, I guess.
On the other hand, I'm quite disappointed that the promise of Dataset
is not kept: performance when using RDD-style coding (map
s and flatMap
s) is worse than when using Dataset
like Dataframe
with SQL-like DSL.
Basically, to have good performance, we need to give up type safety.
What is the reason for such difference between Dataset
used as RDD and Dataset
used as Dataframe
?
Is there a way to improve encoding performance in Dataset
to equate RDD-style coding and SQL-style coding performance? For data engineering, it's much cleaner to have RDD-style coding.
Also, working with the SQL-like DSL would require to flatten our data model and not use nested case classes. Am I right that good performance is only achieved with flat data models?
Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.
Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data.
Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data.
What is the reason for such difference between Dataset used as RDD and Dataset used as Dataframe?
To get some insights let's think for a moment about optimizations used by Spark SQL. As far as I am aware there three types of improvements over plain RDD
:
Now the problem is that not all of these techniques are useful outside restricted programming models, like SQL.
For example it is possible to pushdown selections (filters) but projections are quite limited (you cannot really have a part of the object, can you?). Similarly code generation depends on well defined semantics and it is not easy to it applied in general (it is basically a compiler which generates code that can further optimized by JVM).
Finally sun.misc.Unsafe
is an amazing way to improve performance but it doesn't come for free. While there is a lot of gain here there is also a significant overhead of encoding and decoding.
working with the SQL-like DSL would require to flatten our data model and not use nested case classes.
Nested structures are not exactly the first class citizens and there are some poorly documented limitations you can still do quite lot here.
performance of methods using Spark Row is degraded when upgrading to Spark 2.0, since Dataframe is now a mere alias of Dataset[Row]. That's the cost of unifying the interfaces, I guess.
While there have been some performance regression these two pieces of code are simply not equivalent. In 2.0.0+ DataFrame.map
has different signature than it's 1.x counterpart. If you want to make these two comparable you should convert to RDD
first:
df.rdd.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With