Whether I use Spark-SQL directly or Spark-Shell, I have no idea to check the operation of Spark Catalyst Query Optimizer in explicit way.
For example, let we assume that I made HiveContext as follows:
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
Then, when I try to process a query as:
hiveContext.sql("""
| SELECT dt.d_year, item.i_brand_id brand_id, item.i_brand brand,SUM(ss_ext_sales_price) sum_agg
| FROM date_dim dt, store_sales, item
| WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
| AND store_sales.ss_item_sk = item.i_item_sk
| AND item.i_manufact_id = 128
| AND dt.d_moy=11
| GROUP BY dt.d_year, item.i_brand, item.i_brand_id
| ORDER BY dt.d_year, sum_agg desc, brand_id
| LIMIT 100
""").collect().foreach(println)
Is there way to check the existence of catalyst optimizer? If not exists, then how can we enable catalyst optimizer for HiveContext?
Catalyst Query Optimizer is always enabled in Spark 2.0. It is a part of the optimizations you get for free when you work with Spark 2.0's Datasets (and one of the many reasons you should really be using Datasets before going low level with RDDs).
If you want to see the optimizations Catalyst Query Optimizer applied to your query, use TRACE logging level for SparkOptimizer in conf/log4j.properties:
log4j.logger.org.apache.spark.sql.execution.SparkOptimizer=TRACE
With that whenever you trigger execution of your query (through show, collect or a mere explain) you'll see tons of logs with the work Catalyst Query Optimizer is doing for you every time you execute a query.
Let's see Column Pruning optimization rule in action.
// the business object
case class Person(id: Long, name: String, city: String)
// the dataset to query over
val dataset = Seq(Person(0, "Jacek", "Warsaw")).toDS
// the query
// Note that we work with names only (out of 3 attributes in Person)
val query = dataset.groupBy(upper('name) as 'name).count
scala> query.explain(extended = true)
...
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ColumnPruning ===
Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L] Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
!+- LocalRelation [id#125L, name#126, city#127] +- Project [name#126]
! +- LocalRelation [id#125L, name#126, city#127]
...
== Parsed Logical Plan ==
'Aggregate [upper('name) AS name#160], [upper('name) AS name#160, count(1) AS count#166L]
+- LocalRelation [id#125L, name#126, city#127]
== Analyzed Logical Plan ==
name: string, count: bigint
Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
+- LocalRelation [id#125L, name#126, city#127]
== Optimized Logical Plan ==
Aggregate [upper(name#126)], [upper(name#126) AS name#160, count(1) AS count#166L]
+- LocalRelation [name#126]
== Physical Plan ==
*HashAggregate(keys=[upper(name#126)#171], functions=[count(1)], output=[name#160, count#166L])
+- Exchange hashpartitioning(upper(name#126)#171, 200)
+- *HashAggregate(keys=[upper(name#126) AS upper(name#126)#171], functions=[partial_count(1)], output=[upper(name#126)#171, count#173L])
+- LocalTableScan [name#126]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With