I'm looking for a way to checkpoint DataFrames. Checkpoint is currently an operation on RDD but I can't find how to do it with DataFrames. persist and cache (which are synonyms for each other) are available for DataFrame but they do not "break the lineage" and are thus unsuitable for methods that could loop for hundreds (or thousands) of iterations.
As an example, suppose that I have a list of functions whose signature is DataFrame => DataFrame. I want to have a way to compute the following even when myfunctions has hundreds or thousands of entries:
def foo(dataset: DataFrame, g: DataFrame => Unit) = myfunctions.foldLeft(dataset) { case (df, f) => val nextDF = f(df) g(nextDF) nextDF }
Checkpointing can be used to truncate the logical plan of this DataFrame , which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with SparkContext. setCheckpointDir() . New in version 2.1.
Azure Databricks uses the checkpoint directory to ensure correct and consistent progress information. When a stream is shut down, either purposely or accidentally, the checkpoint directory allows Azure Databricks to restart and pick up exactly where it left off.
There are two types of Apache Spark checkpointing: Reliable Checkpointing – It refers to that checkpointing in which the actual RDD is saved in reliable distributed file system, e.g. HDFS. To set the checkpoint directory call: SparkContext. setCheckpointDir(directory: String).
Use SparkContext. setCheckpointDir to set the path to a checkpoint directory. Checkpointing can be local or reliable which defines how reliable the checkpoint directory is. Local checkpointing uses executor storage to write checkpoint files to and due to the executor lifecycle is considered unreliable.
TL;DR: For Spark versions up to 1.6, to actually get a "checkpointed DF", my suggested solution is based on another answer, but with one extra line:
df.rdd.checkpoint df.rdd.count val df2 = sqlContext.createDataFrame(df.rdd, df.schema) // df2 is checkpointed
Explanation
Updated after further research.
As pointed out, checkpointing a DataFrame directly is not currently (Spark 1.6.1) possible, though there is an issue for it on Spark's Jira.
So, a possible workaround is the one suggested on another answer:
df.rdd.checkpoint // Assuming the checkpoint dir has already been set df.count // An action to compute the checkpoint
However, with this approach, only the df.rdd object will be checkpointed. This can be verified by calling toDebugString
to df.rdd
:
scala> df.rdd.toDebugString (32) MapPartitionsRDD[1] at rdd at <console>:38 [] | ReliableCheckpointRDD[2] at count at <console>:38 []
and then calling toDebugString
after a quick transformation to df
(please note that I created my DataFrame from a JDBC source), returns the following:
scala> df.withColumn("new_column", lit(0)).rdd.toDebugString res4: String = (32) MapPartitionsRDD[5] at rdd at <console>:38 [] | MapPartitionsRDD[4] at rdd at <console>:38 [] | JDBCRDD[3] at rdd at <console>:38 []
df.explain
also shows a hint:
scala> df.explain == Physical Plan == Scan JDBCRelation (...)
So, to actually achieve a "checkpointed" DataFrame, I can only think of creating a new one from the checkpointed RDD:
val newDF = sqlContext.createDataFrame(df.rdd, df.schema) // or val newDF = df.rdd.map { case Row(val1: Int, ..., valN: Int) => (val1, ..., valN) }.toDF("col1", ..., "colN")
Then we can verify that the new DataFrame is "checkpointed":
1) newDF.explain
:
scala> newDF.explain == Physical Plan == Scan PhysicalRDD[col1#5, col2#6, col3#7]
2) newDF.rdd.toDebugString
:
scala> newDF.rdd.toDebugString res7: String = (32) MapPartitionsRDD[10] at rdd at <console>:40 [] | MapPartitionsRDD[8] at createDataFrame at <console>:37 [] | MapPartitionsRDD[1] at rdd at <console>:38 [] | ReliableCheckpointRDD[2] at count at <console>:38 []
3) With transformation:
scala> newDF.withColumn("new_column", lit(0)).rdd.toDebugString res9: String = (32) MapPartitionsRDD[12] at rdd at <console>:40 [] | MapPartitionsRDD[11] at rdd at <console>:40 [] | MapPartitionsRDD[8] at createDataFrame at <console>:37 [] | MapPartitionsRDD[1] at rdd at <console>:38 [] | ReliableCheckpointRDD[2] at count at <console>:38 []
Also, I tried some more complex transformations and I was able to check, in practice, that the newDF
object was checkpointed.
Therefore, the only way I found to reliably checkpoint a DataFrame was by checkpointing its associated RDD and creating a new DataFrame object from it.
I hope it helps. Cheers.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With