Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Temp table caching with spark-sql

Tags:

Is a table registered with registerTempTable (createOrReplaceTempView with spark 2.+) cached?

Using Zeppelin, I register a DataFrame in my scala code, after heavy computation, and then within %pyspark I want to access it, and further filter it.

Will it use a memory-cached version of the table? Or will it be rebuilt each time?

like image 916
Cedric H. Avatar asked Aug 31 '16 11:08

Cedric H.


People also ask

How do I use cache in Spark?

Caching methods in SparkDISK_ONLY: Persist data on disk only in serialized format. MEMORY_ONLY: Persist data in memory only in deserialized format. MEMORY_AND_DISK: Persist data in memory and if enough memory is not available evicted blocks will be stored on disk. OFF_HEAP: Data is persisted in off-heap memory.

How do I cache a DataFrame in Spark SQL?

Using cache() and persist() methods, Spark provides an optimization mechanism to store the intermediate computation of a Spark DataFrame so they can be reused in subsequent actions. When you persist a dataset, each node stores its partitioned data in memory and reuses them in other actions on that dataset.


1 Answers

Registered tables are not cached in memory.

The registerTempTable createOrReplaceTempView method will just create or replace a view of the given DataFrame with a given query plan.

It will convert the query plan to canonicalized SQL string, and store it as view text in metastore, if we need to create a permanent view.

You'll need to cache your DataFrame explicitly. e.g :

df.createOrReplaceTempView("my_table") # df.registerTempTable("my_table") for spark <2.+
spark.cacheTable("my_table") 

EDIT:

Let's illustrate this with an example :

Using cacheTable :

scala> val df = Seq(("1",2),("b",3)).toDF
// df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> sc.getPersistentRDDs
// res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> df.createOrReplaceTempView("my_table")

scala> sc.getPersistentRDDs
// res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> spark.catalog.cacheTable("my_table") // spark.cacheTable("...") before spark 2.0

scala> sc.getPersistentRDDs
// res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(2 -> In-memory table my_table MapPartitionsRDD[2] at cacheTable at <console>:26)

Now the same example using cache.registerTempTable cache.createOrReplaceTempView :

scala> sc.getPersistentRDDs
// res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> val df = Seq(("1",2),("b",3)).toDF
// df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> df.createOrReplaceTempView("my_table")

scala> sc.getPersistentRDDs
// res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> df.cache.createOrReplaceTempView("my_table")

scala> sc.getPersistentRDDs
// res6: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = 
// Map(2 -> ConvertToUnsafe
// +- LocalTableScan [_1#0,_2#1], [[1,2],[b,3]]
//  MapPartitionsRDD[2] at cache at <console>:28)
like image 172
eliasah Avatar answered Sep 18 '22 16:09

eliasah