Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark createDataFrame(df.rdd, df.schema) vs checkPoint for breaking lineage

I'm currently using

val df=longLineageCalculation(....)
val newDf=sparkSession.createDataFrame(df.rdd, df.schema)
newDf.join......

In order to save time when calculating plans, however docs say that checkpointing is the suggested way to "cut" lineage. BUT I don't want to pay the price of saving the RDD to disk.

My process is a batch process which is not-so-long and can be restarted without issues, so checkpointing is not benefit for me (I think).

What are the problems which can arise using "my" method? (Docs suggests checkpointing, which is more expensive, instead of this one for breaking lineages and I would like to know the reason)

Only think I can guess is that if some node fails after my "lineage breaking" maybe my process will fail while the checkpointed one would have worked correctly? (what If the DF is cached instead of checkpointed?)

Thanks!

EDIT:

From SMaZ answer, my own knowledge and the article which he provided. Using createDataframe (which is a Dev-API, so use at "my"/your own risk) will keep the lineage in memory (not a problem for me since I don't have memory problems and the lineage is not big).

With this, it looks (not tested 100%) that Spark should be able to rebuild whatever is needed if it fails.

As I'm not using the data in the following executions, I'll go with cache+createDataframe versus checkpointing (which If i'm not wrong, is actually cache+saveToHDFS+"createDataFrame").

My process is not that critical (if it crashes) since an user will be always expecting the result and they launch it manually, so if it gives problems, they can relaunch (+Spark will relaunch it) or call me, so I can take some risk anyways, but I'm 99% sure there's no risk :)

like image 400
BiS Avatar asked Sep 02 '19 00:09

BiS


People also ask

How to create a Dataframe in spark?

One easy way to create Spark DataFrame is from an existing RDD. first, let’s create an RDD from a collection Seq by calling parallelize () function from SparkContext . We would need this “rdd” object for all our examples below.

What is a schema in spark?

In simple words, the schema is the structure of a dataset or dataframe. The entry point to the Spark SQL. It sets the Spark Master URL to connect to run locally. Is sets the name for the application. If there is no existing Spark Session then it creates a new one otherwise use the existing one.

How to get RDD lineage information in spark?

RDD lineage information can also be printed by using the command filteredRdd.toDebugString (filteredRdd is the RDD here). Also, DAG Visualization shows the complete graph in a very intuitive manner as follows: In Spark, Lineage Graph is a dependencies graph in between existing RDD and new RDD.

How do I create a Dataframe from an existing RDD?

Create a DataFrame using the createDataFrame method. Check the data type to confirm the variable is a DataFrame: A typical event when working in Spark is to make a DataFrame from an existing RDD. Create a sample RDD and then convert it to a DataFrame.


1 Answers

Let me start with creating dataframe with below line :

val newDf=sparkSession.createDataFrame(df.rdd, df.schema)

If we take close look into SparkSession class then this method is annotated with @DeveloperApi. To understand what this annotation means please take a look into below lines from DeveloperApi class

A lower-level, unstable API intended for developers.

Developer API's might change or be removed in minor versions of Spark.

So it is not advised to use this method for production solutions, called as Use at your own risk implementation in open source world.

However, Let's dig deeper what happens when we call createDataframe from RDD. It is calling the internalCreateDataFrame private method and creating LogicalRDD.

LogicalRDD is created when:

  • Dataset is requested to checkpoint
  • SparkSession is requested to create a DataFrame from an RDD of internal binary rows

So it is nothing but the same as checkpoint operation without saving the dataset physically. It is just creating DataFrame From RDD Of Internal Binary Rows and Schema. This might truncate the lineage in memory but not at the Physical level.

So I believe it's just the overhead of creating another RDDs and can not be used as a replacement of checkpoint.

Now, Checkpoint is the process of truncating lineage graph and saving it to a reliable distributed/local file system.

Why checkpoint?

  • If computation takes a long time or lineage is too long or Depends too many RDDs

  • Keeping heavy lineage information comes with the cost of memory.

  • The checkpoint file will not be deleted automatically even after the Spark application terminated so we can use it for some other process

What are the problems which can arise using "my" method? (Docs suggests checkpointing, which is more expensive, instead of this one for breaking lineages and I would like to know the reason)

This article will give detail information on cache and checkpoint. IIUC, your question is more on where we should use the checkpoint. let's discuss some practical scenarios where checkpointing is helpful

  1. Let's take a scenario where we have one dataset on which we want to perform 100 iterative operations and each iteration takes the last iteration result as input(Spark MLlib use cases). Now during this iterative process lineage is going to grow over the period. Here checkpointing dataset at a regular interval(let say every 10 iterations) will assure that in case of any failure we can start the process from last failure point.
  2. Let's take some batch example. Imagine we have a batch which is creating one master dataset with heavy lineage or complex computations. Now after some regular intervals, we are getting some data which should use earlier calculated master dataset. Here if we checkpoint our master dataset then it can be reused for all subsequent processes from different sparkSession.

My process is a batch process which is not-so-long and can be restarted without issues, so checkpointing is not benefit for me (I think).

That's correct, If your process is not heavy-computation/Big-lineage then there is no point of checkpointing. Thumb rule is if your dataset is not used multiple time and can be re-build faster than the time is taken and resources used for checkpoint/cache then we should avoid it. It will give more resources to your process.

like image 54
SMaZ Avatar answered Nov 15 '22 15:11

SMaZ