Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to checkpoint DataFrames?

I'm looking for a way to checkpoint DataFrames. Checkpoint is currently an operation on RDD but I can't find how to do it with DataFrames. persist and cache (which are synonyms for each other) are available for DataFrame but they do not "break the lineage" and are thus unsuitable for methods that could loop for hundreds (or thousands) of iterations.

As an example, suppose that I have a list of functions whose signature is DataFrame => DataFrame. I want to have a way to compute the following even when myfunctions has hundreds or thousands of entries:

def foo(dataset: DataFrame, g: DataFrame => Unit) =     myfunctions.foldLeft(dataset) {         case (df, f) =>             val nextDF = f(df)             g(nextDF)             nextDF    } 
like image 846
Daniel Shields Avatar asked Oct 29 '15 20:10

Daniel Shields


People also ask

How do you do a checkpoint in Pyspark?

Checkpointing can be used to truncate the logical plan of this DataFrame , which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with SparkContext. setCheckpointDir() . New in version 2.1.

What is checkpoint in Databricks?

Azure Databricks uses the checkpoint directory to ensure correct and consistent progress information. When a stream is shut down, either purposely or accidentally, the checkpoint directory allows Azure Databricks to restart and pick up exactly where it left off.

How do I add a checkpoint in Spark?

There are two types of Apache Spark checkpointing: Reliable Checkpointing – It refers to that checkpointing in which the actual RDD is saved in reliable distributed file system, e.g. HDFS. To set the checkpoint directory call: SparkContext. setCheckpointDir(directory: String).

How do you read a Spark checkpoint?

Use SparkContext. setCheckpointDir to set the path to a checkpoint directory. Checkpointing can be local or reliable which defines how reliable the checkpoint directory is. Local checkpointing uses executor storage to write checkpoint files to and due to the executor lifecycle is considered unreliable.


1 Answers

TL;DR: For Spark versions up to 1.6, to actually get a "checkpointed DF", my suggested solution is based on another answer, but with one extra line:

df.rdd.checkpoint df.rdd.count val df2 = sqlContext.createDataFrame(df.rdd, df.schema) // df2 is checkpointed 

Explanation

Updated after further research.

As pointed out, checkpointing a DataFrame directly is not currently (Spark 1.6.1) possible, though there is an issue for it on Spark's Jira.

So, a possible workaround is the one suggested on another answer:

df.rdd.checkpoint // Assuming the checkpoint dir has already been set df.count // An action to compute the checkpoint 

However, with this approach, only the df.rdd object will be checkpointed. This can be verified by calling toDebugString to df.rdd:

 scala> df.rdd.toDebugString  (32) MapPartitionsRDD[1] at rdd at <console>:38 []   |   ReliableCheckpointRDD[2] at count at <console>:38 [] 

and then calling toDebugString after a quick transformation to df (please note that I created my DataFrame from a JDBC source), returns the following:

scala> df.withColumn("new_column", lit(0)).rdd.toDebugString res4: String = (32) MapPartitionsRDD[5] at rdd at <console>:38 []  |   MapPartitionsRDD[4] at rdd at <console>:38 []  |   JDBCRDD[3] at rdd at <console>:38 [] 

df.explain also shows a hint:

scala> df.explain == Physical Plan == Scan JDBCRelation (...) 

So, to actually achieve a "checkpointed" DataFrame, I can only think of creating a new one from the checkpointed RDD:

val newDF = sqlContext.createDataFrame(df.rdd, df.schema) // or val newDF = df.rdd.map {    case Row(val1: Int, ..., valN: Int) => (val1, ..., valN) }.toDF("col1", ..., "colN") 

Then we can verify that the new DataFrame is "checkpointed":

1) newDF.explain:

scala> newDF.explain == Physical Plan == Scan PhysicalRDD[col1#5, col2#6, col3#7] 

2) newDF.rdd.toDebugString:

scala> newDF.rdd.toDebugString res7: String = (32) MapPartitionsRDD[10] at rdd at <console>:40 []  |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []  |   MapPartitionsRDD[1] at rdd at <console>:38 []  |   ReliableCheckpointRDD[2] at count at <console>:38 [] 

3) With transformation:

scala> newDF.withColumn("new_column", lit(0)).rdd.toDebugString res9: String = (32) MapPartitionsRDD[12] at rdd at <console>:40 []  |   MapPartitionsRDD[11] at rdd at <console>:40 []  |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []  |   MapPartitionsRDD[1] at rdd at <console>:38 []  |   ReliableCheckpointRDD[2] at count at <console>:38 [] 

Also, I tried some more complex transformations and I was able to check, in practice, that the newDF object was checkpointed.

Therefore, the only way I found to reliably checkpoint a DataFrame was by checkpointing its associated RDD and creating a new DataFrame object from it.

I hope it helps. Cheers.

like image 131
Daniel de Paula Avatar answered Sep 23 '22 18:09

Daniel de Paula