Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to enable Catalyst Query Optimiser in Spark SQL?

Whether I use Spark-SQL directly or Spark-Shell, I have no idea to check the operation of Spark Catalyst Query Optimizer in explicit way.

For example, let we assume that I made HiveContext as follows:

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

Then, when I try to process a query as:

hiveContext.sql("""
        | SELECT dt.d_year, item.i_brand_id brand_id, item.i_brand brand,SUM(ss_ext_sales_price) sum_agg
        | FROM  date_dim dt, store_sales, item
        | WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
        |   AND store_sales.ss_item_sk = item.i_item_sk
        |   AND item.i_manufact_id = 128
        |   AND dt.d_moy=11
        | GROUP BY dt.d_year, item.i_brand, item.i_brand_id
        | ORDER BY dt.d_year, sum_agg desc, brand_id
        | LIMIT 100
        """).collect().foreach(println)

Is there way to check the existence of catalyst optimizer? If not exists, then how can we enable catalyst optimizer for HiveContext?

like image 246
YoonMin Nam Avatar asked Mar 06 '26 10:03

YoonMin Nam


1 Answers

Catalyst Query Optimizer is always enabled in Spark 2.0. It is a part of the optimizations you get for free when you work with Spark 2.0's Datasets (and one of the many reasons you should really be using Datasets before going low level with RDDs).

If you want to see the optimizations Catalyst Query Optimizer applied to your query, use TRACE logging level for SparkOptimizer in conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.SparkOptimizer=TRACE

With that whenever you trigger execution of your query (through show, collect or a mere explain) you'll see tons of logs with the work Catalyst Query Optimizer is doing for you every time you execute a query.

Let's see Column Pruning optimization rule in action.

// the business object
case class Person(id: Long, name: String, city: String)

// the dataset to query over
val dataset = Seq(Person(0, "Jacek", "Warsaw")).toDS

// the query
// Note that we work with names only (out of 3 attributes in Person)
val query = dataset.groupBy(upper('name) as 'name).count

scala> query.explain(extended = true)
...
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
 Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]   Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
!+- LocalRelation [id#125L, name#126, city#127]                                       +- Project [name#126]
!                                                                                        +- LocalRelation [id#125L, name#126, city#127]
...
== Parsed Logical Plan ==
'Aggregate [upper('name) AS name#160], [upper('name) AS name#160, count(1) AS count#166L]
+- LocalRelation [id#125L, name#126, city#127]

== Analyzed Logical Plan ==
name: string, count: bigint
Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
+- LocalRelation [id#125L, name#126, city#127]

== Optimized Logical Plan ==
Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
+- LocalRelation [name#126]

== Physical Plan ==
*HashAggregate(keys=[upper(name#126)#171], functions=[count(1)], output=[name#160, count#166L])
+- Exchange hashpartitioning(upper(name#126)#171, 200)
   +- *HashAggregate(keys=[upper(name#126) AS upper(name#126)#171], functions=[partial_count(1)], output=[upper(name#126)#171, count#173L])
      +- LocalTableScan [name#126]
like image 176
Jacek Laskowski Avatar answered Mar 09 '26 11:03

Jacek Laskowski