Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to know which count query is the fastest?

I've been exploring query optimizations in the recent releases of Spark SQL 2.3.0-SNAPSHOT and noticed different physical plans for semantically-identical queries.

Let's assume I've got to count the number of rows in the following dataset:

val q = spark.range(1)

I could count the number of rows as follows:

  1. q.count
  2. q.collect.size
  3. q.rdd.count
  4. q.queryExecution.toRdd.count

My initial thought was that it's almost a constant operation (surely due to a local dataset) that would somehow have been optimized by Spark SQL and would give a result immediately, esp. the 1st one where Spark SQL is in full control of the query execution.

Having had a look at the physical plans of the queries led me to believe that the most effective query would be the last:

q.queryExecution.toRdd.count

The reasons being that:

  1. It avoids deserializing rows from their InternalRow binary format
  2. The query is codegened
  3. There's only one job with a single stage

The physical plan is as simple as that.

Details for Job

Is my reasoning correct? If so, would the answer be different if I read the dataset from an external data source (e.g. files, JDBC, Kafka)?

The main question is what are the factors to take into consideration to say whether a query is more efficient than others (per this example)?


The other execution plans for completeness.

q.count

q.count

q.collect.size

q.collect.size

q.rdd.count

q.rdd.count

like image 688
Jacek Laskowski Avatar asked May 08 '17 08:05

Jacek Laskowski


2 Answers

Formatting stuff is terrible, oh well

/*note: I'm using spark 1.5.2 so some of these might be different than what you might find in a newer version
 *note: These were all done using a 25 node , 40 core/node and started with --num-executors 64 --executor-cores 1 --executor-memory 4g
 *note: the displayed values are the mean from 10 runs
 *note: the spark-shell was restarted every time I noticed any spikes intra-run
 *
 *million/billion = sc.parallelize(1  to 1000000).toDF("col1")
 *
 *val s0 = sc.parallelize(1  to 1000000000)
 *//had to use this to get around maxInt constraints for Seq
 *billion10 = sc.union(s0,s1,s2,s3,s4,s5,s6,s7,s8,s9).toDF("col1")
 *
 *for parquet files
 *compression=uncompressed
 *written with:    million/billion/billion10.write.parquet
 *read with:    sqlContext.read.parquet
 *
 *for text files
 *written with:    million/billion/billion10.map(x=> x.mkString(",")).saveAsTextFile
 *read with:    sc.textFile.toDF("col1")
 *
 *excluded the collect() because that would have murdered my machine
 *made them all dataframes for consistency
/*


size       type     query         
billion10  text     count              81.594582
                    queryExecution     81.949047
                    rdd.count         119.710021
           Seq      count              18.768544
                    queryExecution     14.257751
                    rdd.count          36.404834
           parquet  count              12.016753
                    queryExecution     24.305452
                    rdd.count          41.932466
billion    text     count              14.120583
                    queryExecution     14.346528
                    rdd.count          22.240026
           Seq      count               2.191781
                    queryExecution      1.655651
                    rdd.count           2.831840
           parquet  count               2.004464
                    queryExecution      5.010546
                    rdd.count           7.815010
million    text     count               0.975095
                    queryExecution      0.113718
                    rdd.count           0.184904
           Seq      count               0.192044
                    queryExecution      0.029069
                    rdd.count           0.036061
           parquet  count               0.963874
                    queryExecution      0.217661
                    rdd.count           0.262279

Observations:

  • For the million records, the Seq was the fastest, but tenths of seconds can be hard to gauge true speed difference across a cluster
  • Storing it as TEXT was slow in general, could be how I was reading it in, I almost exclusively work with Parquet, so missing something there would be easy for me to miss
  • count and queryExecution are faster than rdd.count for every case (as Herman rationalized in his answer)
  • count and queryExecution take turns being faster and vary between datatypes:
    • count is faster for parquet
    • queryExecution is faster for Seq
    • for Text they are nearly identical
  • the speeds are non-linear as size increases

If anyone would like a different storage type , counting different dtypes, compression, more columns, go ahead and comment or message me and I'll see what I can do

like image 31
James Tobin Avatar answered Oct 05 '22 04:10

James Tobin


I did some testing on val q = spark.range(100000000):

  1. q.count: ~50 ms
  2. q.collect.size: I stopped the query after a minute or so...
  3. q.rdd.count: ~1100 ms
  4. q.queryExecution.toRdd.count: ~600 ms

Some explanation:

Option 1 is by far the fastest because it uses both partial aggregation and whole stage code generation. The whole stage code generation allows the JVM to get really clever and do some drastic optimizations (see: https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html).

Option 2. Is just slow and materializes everything on the driver, which is generally a bad idea.

Option 3. Is like option 4, but this first converts an internal row to a regular row, and this is quite expensive.

Option 4. Is about as fast you will get without whole stage code generation.

like image 154
Herman van Hovell Avatar answered Oct 05 '22 05:10

Herman van Hovell