I've been exploring query optimizations in the recent releases of Spark SQL 2.3.0-SNAPSHOT and noticed different physical plans for semantically-identical queries.
Let's assume I've got to count the number of rows in the following dataset:
val q = spark.range(1)
I could count the number of rows as follows:
q.count
q.collect.size
q.rdd.count
q.queryExecution.toRdd.count
My initial thought was that it's almost a constant operation (surely due to a local dataset) that would somehow have been optimized by Spark SQL and would give a result immediately, esp. the 1st one where Spark SQL is in full control of the query execution.
Having had a look at the physical plans of the queries led me to believe that the most effective query would be the last:
q.queryExecution.toRdd.count
The reasons being that:
InternalRow
binary formatThe physical plan is as simple as that.
Is my reasoning correct? If so, would the answer be different if I read the dataset from an external data source (e.g. files, JDBC, Kafka)?
The main question is what are the factors to take into consideration to say whether a query is more efficient than others (per this example)?
The other execution plans for completeness.
Formatting stuff is terrible, oh well
/*note: I'm using spark 1.5.2 so some of these might be different than what you might find in a newer version
*note: These were all done using a 25 node , 40 core/node and started with --num-executors 64 --executor-cores 1 --executor-memory 4g
*note: the displayed values are the mean from 10 runs
*note: the spark-shell was restarted every time I noticed any spikes intra-run
*
*million/billion = sc.parallelize(1 to 1000000).toDF("col1")
*
*val s0 = sc.parallelize(1 to 1000000000)
*//had to use this to get around maxInt constraints for Seq
*billion10 = sc.union(s0,s1,s2,s3,s4,s5,s6,s7,s8,s9).toDF("col1")
*
*for parquet files
*compression=uncompressed
*written with: million/billion/billion10.write.parquet
*read with: sqlContext.read.parquet
*
*for text files
*written with: million/billion/billion10.map(x=> x.mkString(",")).saveAsTextFile
*read with: sc.textFile.toDF("col1")
*
*excluded the collect() because that would have murdered my machine
*made them all dataframes for consistency
/*
size type query
billion10 text count 81.594582
queryExecution 81.949047
rdd.count 119.710021
Seq count 18.768544
queryExecution 14.257751
rdd.count 36.404834
parquet count 12.016753
queryExecution 24.305452
rdd.count 41.932466
billion text count 14.120583
queryExecution 14.346528
rdd.count 22.240026
Seq count 2.191781
queryExecution 1.655651
rdd.count 2.831840
parquet count 2.004464
queryExecution 5.010546
rdd.count 7.815010
million text count 0.975095
queryExecution 0.113718
rdd.count 0.184904
Seq count 0.192044
queryExecution 0.029069
rdd.count 0.036061
parquet count 0.963874
queryExecution 0.217661
rdd.count 0.262279
Observations:
If anyone would like a different storage type , counting different dtypes, compression, more columns, go ahead and comment or message me and I'll see what I can do
I did some testing on val q = spark.range(100000000)
:
q.count
: ~50 msq.collect.size
: I stopped the query after a minute or so...q.rdd.count
: ~1100 msq.queryExecution.toRdd.count
: ~600 msSome explanation:
Option 1 is by far the fastest because it uses both partial aggregation and whole stage code generation. The whole stage code generation allows the JVM to get really clever and do some drastic optimizations (see: https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html).
Option 2. Is just slow and materializes everything on the driver, which is generally a bad idea.
Option 3. Is like option 4, but this first converts an internal row to a regular row, and this is quite expensive.
Option 4. Is about as fast you will get without whole stage code generation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With