Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to cache partitioned dataset and use in multiple queries?

I have the following code:

dataset
       .distinct()
       .repartition(400)
       .persist(StorageLevel.MEMORY_ONLY())
       .createOrReplaceTempView("temp");
sqlContext.sql("select * from temp");

This is just an example, I need to execute around 100 queries over the same entity, that's why I'm persisting it. I thought that when I query temp it will query the cached entity, but when I check on the spark ui the Query Details, I see that a repartition is executed for each query over temp, thus is querying the dataset and executing the DAG for each query.

----------------- EDIT ------------------------

Here the diagram and logical plan of the queries, for me are the same, my expectation was that for the first query execute all the required steps and then it will directly access the in-memory view.

I have check with sqlContext.isCached("temp") and print true.

Query execution Diagram

enter image description here

First Query Plan

== Parsed Logical Plan ==
'Project [11 AS tenant_id#4958, cube_purchase_details AS cube_name#4959, purchase_activity AS field#4960, 'purchase_activity AS value#4961]
+- 'UnresolvedRelation `filter_temp`

== Analyzed Logical Plan ==
tenant_id: string, cube_name: string, field: string, value: string
Project [11 AS tenant_id#4958, cube_purchase_details AS cube_name#4959, purchase_activity AS field#4960, purchase_activity#4062 AS value#4961]
+- SubqueryAlias filter_temp, `filter_temp`
   +- Aggregate [purchase_activity#4062], [purchase_activity#4062]
      +- Project [purchase_activity#4062]
         +- Repartition 400, true
            +- GlobalLimit 10000
               +- LocalLimit 10000
                  +- Project [purchase_activity#4062, top_shop_1#4069, top_brand_1#4072, top_brand_2#4073, top_brand_3#4074, top_brand_4#4075, top_brand_5#4076, top_manufacturer_1#4077, top_manufacturer_2#4078, top_manufacturer_3#4079, top_manufacturer_4#4080, top_manufacturer_5#4081, top_product_category_1#4082, top_product_category_2#4083, top_product_category_3#4084, top_product_category_4#4085, top_product_category_5#4086, top_salesperson_1#4093, top_salesperson_2#4094, top_salesperson_3#4095, age_category#4109, inactive#4115, activity_id#4144, activity_name#4145, ... 67 more fields]
                     +- Relation[purchase_detail_id#3918,tenant_id#3919,purchase_detail_date#3920,purchase_detail_type#3921,user_id#3922,user_domain#3923,purchase_id#3924,purchase_date#3925,is_purchase#3926,year#3927,quarter#3928,month#3929,week#3930,weekday#3931,day#3932,former_purchase_id#3933,pd_shop_id#3934,customer_id#3935,loyalty_id#3936,quantity#3937,unit_price#3938,total_price#3939,discount#3940,currency#3941,... 219 more fields] parquet

Other Queries Plan

== Parsed Logical Plan ==
'Project [11 AS tenant_id#6816, cube_purchase_details AS cube_name#6817, top_brand_1 AS field#6818, 'top_brand_1 AS value#6819]
+- 'UnresolvedRelation `filter_temp`

== Analyzed Logical Plan ==
tenant_id: string, cube_name: string, field: string, value: string
Project [11 AS tenant_id#6816, cube_purchase_details AS cube_name#6817, top_brand_1 AS field#6818, top_brand_1#4072 AS value#6819]
+- SubqueryAlias filter_temp, `filter_temp`
   +- Aggregate [top_brand_1#4072], [top_brand_1#4072]
      +- Project [top_brand_1#4072]
         +- Repartition 400, true
            +- GlobalLimit 10000
               +- LocalLimit 10000
                  +- Project [purchase_activity#4062, top_shop_1#4069, top_brand_1#4072, top_brand_2#4073, top_brand_3#4074, top_brand_4#4075, top_brand_5#4076, top_manufacturer_1#4077, top_manufacturer_2#4078, top_manufacturer_3#4079, top_manufacturer_4#4080, top_manufacturer_5#4081, top_product_category_1#4082, top_product_category_2#4083, top_product_category_3#4084, top_product_category_4#4085, top_product_category_5#4086, top_salesperson_1#4093, top_salesperson_2#4094, top_salesperson_3#4095, age_category#4109, inactive#4115, activity_id#4144, activity_name#4145, ... 67 more fields]
                     +- Relation[purchase_detail_id#3918,tenant_id#3919,purchase_detail_date#3920,purchase_detail_type#3921,user_id#3922,user_domain#3923,purchase_id#3924,purchase_date#3925,is_purchase#3926,year#3927,quarter#3928,month#3929,week#3930,weekday#3931,day#3932,former_purchase_id#3933,pd_shop_id#3934,customer_id#3935,loyalty_id#3936,quantity#3937,unit_price#3938,total_price#3939,discount#3940,currency#3941,... 219 more fields] parquet

Here an screenshot of the Spark UI Storage page in case that could be helpfull too.

enter image description here

How can I access this persisted dataset from spark-sql?

like image 538
José Avatar asked May 12 '17 13:05

José


2 Answers

How can I access this persisted dataset from spark-sql?

Spark SQL will re-use cached queries for you as long as you reference them in your other queries. Use explain operator to confirm and web UI's Details for Job page (under Jobs tab).


Dataset's persist and cache operators are lazy (contrary to SQL's CACHE TABLE queries) and hence after the following it won't be really cached.

dataset
  .distinct()
  .repartition(400)
  .persist(StorageLevel.MEMORY_ONLY())
  .createOrReplaceTempView("temp");

persist(StorageLevel.MEMORY_ONLY()) is just a hint that Spark SQL should cache the relation next time an action is executed. That leads to a pattern where people execute head or count actions to trigger caching.

After a table is persisted you can use web UI's Storage tab to see its corresponding cached RDD entry.

Moreover, executing cache on already-cached dataset will give you a warning.

scala> q.distinct.repartition(5).cache.head
17/05/16 10:57:49 WARN CacheManager: Asked to cache already cached data.
res4: org.apache.spark.sql.Row = [2]

scala> q.cache()
17/05/16 10:59:54 WARN CacheManager: Asked to cache already cached data.
res6: q.type = [key: bigint]

I think you'd expect that after caching an execution plan should somehow be shorter so the steps that are already executed should somehow be removed from the plan. Is that correct?

If so, your understanding is partially correct. Partially because although parts of a query plan are still in the execution plan, but once the query is executed, the cached table (and the corresponding stages) should already be fetched from cache.

You can check it out in Details for Job page for the job that participates in the query. The cached part is marked using a green small circle in the execution DAG.

Execution Plan with Cached Part

Quoting from Understanding your Apache Spark Application Through Visualization:

Second, one of the RDDs is cached in the first stage (denoted by the green highlight). Since the enclosing operation involves reading from HDFS, caching this RDD means future computations on this RDD can access at least a subset of the original file from memory instead of from HDFS.

You can also review a query and what data sources are cached using explain operator.

scala> q.explain
== Physical Plan ==
InMemoryTableScan [key#21L]
   +- InMemoryRelation [key#21L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Project [(id#18L % 5) AS key#21L]
            +- *Range (0, 1000, step=1, splits=8)
like image 156
Jacek Laskowski Avatar answered Sep 20 '22 03:09

Jacek Laskowski


Spark 1.x:

You can use SQLContext.cacheTable:

dataset
  .distinct()
  .repartition(400)
  .registerTempTable("temp");

sqlContext.cacheTable("temp");

Spark 2.x:

You can use SparkCatalog.cacheTable:

dataset
  .distinct()
  .repartition(400)
  .createOrReplaceTempView("temp");

spark.catalog.cacheTable("temp");
like image 40
zero323 Avatar answered Sep 18 '22 03:09

zero323