Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Caching ordered Spark DataFrame creates unwanted job

I want to convert a RDD to a DataFrame and want to cache the results of the RDD:

from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as fn

schema = StructType([StructField('t', DoubleType()), StructField('value', DoubleType())])

df = spark.createDataFrame(
    sc.parallelize([Row(t=float(i/10), value=float(i*i)) for i in range(1000)], 4), #.cache(),
    schema=schema,
    verifySchema=False
).orderBy("t") #.cache()
  • If you don't use a cache function no job is generated.
  • If you use cache only after the orderBy 1 jobs is generated for cache:enter image description here
  • If you use cache only after the parallelize no job is generated.

Why does cache generate a job in this one case? How can I avoid the job generation of cache (caching the DataFrame and no RDD)?

Edit: I investigated more into the problem and found that without the orderBy("t") no job is generated. Why?

like image 500
R1tschY Avatar asked Mar 22 '17 12:03

R1tschY


People also ask

What is the advantage of caching a Spark DataFrame?

Advantages for Caching and Persistence of DataFrameTime-efficient – Reusing repeated computations saves lots of time. Execution time – Saves execution time of the job and we can perform more jobs on the same cluster.

When should I cache my Spark data frame?

When to cache? If you're executing multiple actions on the same DataFrame then cache it. Every time the following line is executed (in this case 3 times), spark reads the Parquet file, and executes the query. Now, Spark will read the Parquet, execute the query only once and then cache it.

Can we cache DataFrame in Spark?

cache() is an Apache Spark transformation that can be used on a DataFrame, Dataset, or RDD when you want to perform more than one action. cache() caches the specified DataFrame, Dataset, or RDD in the memory of your cluster's workers.

What happens when cache memory is full in Spark?

unpersist() . If the caching layer becomes full, Spark will start evicting the data from memory using the LRU (least recently used) strategy. So it is good practice to use unpersist to stay more in control about what should be evicted.


1 Answers

I submitted a bug ticket and it was closed with following reason:

Caching requires the backing RDD. That requires we also know the backing partitions, and this is somewhat special for a global order: it triggers a job (scan) because we need to determine the partition bounds.

like image 122
R1tschY Avatar answered Sep 21 '22 18:09

R1tschY