I have the following code:
dataset
.distinct()
.repartition(400)
.persist(StorageLevel.MEMORY_ONLY())
.createOrReplaceTempView("temp");
sqlContext.sql("select * from temp");
This is just an example, I need to execute around 100 queries over the same entity, that's why I'm persisting it.
I thought that when I query temp
it will query the cached entity, but when I check on the spark ui the Query Details, I see that a repartition is executed for each query over temp
, thus is querying the dataset and executing the DAG for each query.
----------------- EDIT ------------------------
Here the diagram and logical plan of the queries, for me are the same, my expectation was that for the first query execute all the required steps and then it will directly access the in-memory view.
I have check with sqlContext.isCached("temp")
and print true.
Query execution Diagram
First Query Plan
== Parsed Logical Plan ==
'Project [11 AS tenant_id#4958, cube_purchase_details AS cube_name#4959, purchase_activity AS field#4960, 'purchase_activity AS value#4961]
+- 'UnresolvedRelation `filter_temp`
== Analyzed Logical Plan ==
tenant_id: string, cube_name: string, field: string, value: string
Project [11 AS tenant_id#4958, cube_purchase_details AS cube_name#4959, purchase_activity AS field#4960, purchase_activity#4062 AS value#4961]
+- SubqueryAlias filter_temp, `filter_temp`
+- Aggregate [purchase_activity#4062], [purchase_activity#4062]
+- Project [purchase_activity#4062]
+- Repartition 400, true
+- GlobalLimit 10000
+- LocalLimit 10000
+- Project [purchase_activity#4062, top_shop_1#4069, top_brand_1#4072, top_brand_2#4073, top_brand_3#4074, top_brand_4#4075, top_brand_5#4076, top_manufacturer_1#4077, top_manufacturer_2#4078, top_manufacturer_3#4079, top_manufacturer_4#4080, top_manufacturer_5#4081, top_product_category_1#4082, top_product_category_2#4083, top_product_category_3#4084, top_product_category_4#4085, top_product_category_5#4086, top_salesperson_1#4093, top_salesperson_2#4094, top_salesperson_3#4095, age_category#4109, inactive#4115, activity_id#4144, activity_name#4145, ... 67 more fields]
+- Relation[purchase_detail_id#3918,tenant_id#3919,purchase_detail_date#3920,purchase_detail_type#3921,user_id#3922,user_domain#3923,purchase_id#3924,purchase_date#3925,is_purchase#3926,year#3927,quarter#3928,month#3929,week#3930,weekday#3931,day#3932,former_purchase_id#3933,pd_shop_id#3934,customer_id#3935,loyalty_id#3936,quantity#3937,unit_price#3938,total_price#3939,discount#3940,currency#3941,... 219 more fields] parquet
Other Queries Plan
== Parsed Logical Plan ==
'Project [11 AS tenant_id#6816, cube_purchase_details AS cube_name#6817, top_brand_1 AS field#6818, 'top_brand_1 AS value#6819]
+- 'UnresolvedRelation `filter_temp`
== Analyzed Logical Plan ==
tenant_id: string, cube_name: string, field: string, value: string
Project [11 AS tenant_id#6816, cube_purchase_details AS cube_name#6817, top_brand_1 AS field#6818, top_brand_1#4072 AS value#6819]
+- SubqueryAlias filter_temp, `filter_temp`
+- Aggregate [top_brand_1#4072], [top_brand_1#4072]
+- Project [top_brand_1#4072]
+- Repartition 400, true
+- GlobalLimit 10000
+- LocalLimit 10000
+- Project [purchase_activity#4062, top_shop_1#4069, top_brand_1#4072, top_brand_2#4073, top_brand_3#4074, top_brand_4#4075, top_brand_5#4076, top_manufacturer_1#4077, top_manufacturer_2#4078, top_manufacturer_3#4079, top_manufacturer_4#4080, top_manufacturer_5#4081, top_product_category_1#4082, top_product_category_2#4083, top_product_category_3#4084, top_product_category_4#4085, top_product_category_5#4086, top_salesperson_1#4093, top_salesperson_2#4094, top_salesperson_3#4095, age_category#4109, inactive#4115, activity_id#4144, activity_name#4145, ... 67 more fields]
+- Relation[purchase_detail_id#3918,tenant_id#3919,purchase_detail_date#3920,purchase_detail_type#3921,user_id#3922,user_domain#3923,purchase_id#3924,purchase_date#3925,is_purchase#3926,year#3927,quarter#3928,month#3929,week#3930,weekday#3931,day#3932,former_purchase_id#3933,pd_shop_id#3934,customer_id#3935,loyalty_id#3936,quantity#3937,unit_price#3938,total_price#3939,discount#3940,currency#3941,... 219 more fields] parquet
Here an screenshot of the Spark UI Storage page in case that could be helpfull too.
How can I access this persisted dataset from spark-sql?
How can I access this persisted dataset from spark-sql?
Spark SQL will re-use cached queries for you as long as you reference them in your other queries. Use explain
operator to confirm and web UI's Details for Job page (under Jobs tab).
Dataset's persist
and cache
operators are lazy (contrary to SQL's CACHE TABLE
queries) and hence after the following it won't be really cached.
dataset
.distinct()
.repartition(400)
.persist(StorageLevel.MEMORY_ONLY())
.createOrReplaceTempView("temp");
persist(StorageLevel.MEMORY_ONLY())
is just a hint that Spark SQL should cache the relation next time an action is executed. That leads to a pattern where people execute head
or count
actions to trigger caching.
After a table is persisted you can use web UI's Storage tab to see its corresponding cached RDD entry.
Moreover, executing cache
on already-cached dataset will give you a warning.
scala> q.distinct.repartition(5).cache.head
17/05/16 10:57:49 WARN CacheManager: Asked to cache already cached data.
res4: org.apache.spark.sql.Row = [2]
scala> q.cache()
17/05/16 10:59:54 WARN CacheManager: Asked to cache already cached data.
res6: q.type = [key: bigint]
I think you'd expect that after caching an execution plan should somehow be shorter so the steps that are already executed should somehow be removed from the plan. Is that correct?
If so, your understanding is partially correct. Partially because although parts of a query plan are still in the execution plan, but once the query is executed, the cached table (and the corresponding stages) should already be fetched from cache.
You can check it out in Details for Job page for the job that participates in the query. The cached part is marked using a green small circle in the execution DAG.
Quoting from Understanding your Apache Spark Application Through Visualization:
Second, one of the RDDs is cached in the first stage (denoted by the green highlight). Since the enclosing operation involves reading from HDFS, caching this RDD means future computations on this RDD can access at least a subset of the original file from memory instead of from HDFS.
You can also review a query and what data sources are cached using explain
operator.
scala> q.explain
== Physical Plan ==
InMemoryTableScan [key#21L]
+- InMemoryRelation [key#21L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Project [(id#18L % 5) AS key#21L]
+- *Range (0, 1000, step=1, splits=8)
Spark 1.x:
You can use SQLContext.cacheTable
:
dataset
.distinct()
.repartition(400)
.registerTempTable("temp");
sqlContext.cacheTable("temp");
Spark 2.x:
You can use SparkCatalog.cacheTable
:
dataset
.distinct()
.repartition(400)
.createOrReplaceTempView("temp");
spark.catalog.cacheTable("temp");
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With