Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark - Checkpointing implication on performance

From the Spark's DStreamCheckpointData, it seems like checkpointing mechanism collects the time window to be be checkpointed and updates/writes it to checkpoint files. I am trying to understand couple of things specifically:

  1. At every checkpoint interval, does it read all the previous checkpoint data and then update the current state?. If so, what will be the impact on performance when checkpoint state grows very large, that would certainly slow down a long running streaming context.

  2. Is there any general rule or formula to calculate checkpoint interval for different data ingestion rates, sliding window and batch intervals?

like image 256
Arpit1286 Avatar asked May 04 '15 22:05

Arpit1286


People also ask

Why does spark perform checkpointing?

If any data is lost, the recovery should be speedy. Spark streaming accomplishes this using checkpointing. So, Checkpointing is a process to truncate RDD lineage graph. It saves the application state timely to reliable storage (HDFS).

Does Apache spark provide checkpointing?

Types of Checkpointing in Apache SparkThere are 2 types of Apache Spark checkpointing: Reliable Checkpointing :- Checkpointing in which actual RDD is saved to reliable distributed storage i.e. HDFS. We need to call the SparkContext. setCheckpointDir(directory: String) method to set the checkpointing directory.

Does spark Streaming programs typically run continuously Why?

Users specify a streaming computation by writing a batch computation (using Spark's DataFrame/Dataset API), and the engine automatically incrementalizes this computation (runs it continuously).

What is RDD checkpointing in spark?

RDD Checkpointing is a process of truncating RDD lineage graph and saving it to a reliable distributed (HDFS) or local file system. There are two types of checkpointing: < > - RDD checkpointing that saves the actual intermediate RDD data to a reliable distributed file system (e.g. Hadoop DFS)


1 Answers

  1. Yes, checkpointing is a blocking operation, so that it stops processing during its activity. The length of time for which computation is stopped by this serialization of state depends on the write performance of whichever media you're writing this to (have you heard of Tachyon/Alluxio ?).

    On the other hand, prior checkpointing data is not read on every new checkpointing operation : the stateful information is already being maintained in Spark's cache as the stream is being operated upon (checkpoints are just a backup of it). Let's imagine the most simple state possible, a sum of all integers, met in a stream of integers : on each batch you compute a new value for this sum, based on the data you see in the batch — and you can store this partial sum in cache (see above). Every five batches or so (depending on your checkpointing interval) you write this sum to disk. Now, if you lose one executor (one partition) in a subsequent batch, you can reconstruct the total for this by only re-processing the partitions for this executor for up to the last five partitions (by reading the disk to find the last checkpoint, and re-processing the missing parts of the last up-to-five batches). But in normal processing (no incidents), you have no need to access the disk.

  2. There is no general formula that I know of since you would have to fix the maximum amount of data you're willing to recover from. Old documentation gives a rule of thumb.

    But in the case of streaming, you can think of your batch interval like a computation budget. Let's say you have a batch interval of 30 seconds. On each batch you have 30 seconds to allocate to writing to disk, or computing (batch processing time). To make sure you job is stable, you have to ensure that your batch processing time does not go over budget, otherwise you will fill up the memory of your cluster (if it takes you 35 seconds to process and "flush" 30 seconds of data, on each batch, you ingest some more data than what you flush during the same time — since your memory is finite, this eventually yields to an overfill).

    Let's say your average batch processing time is 25 seconds. So on each batch, you have 5 seconds of unallocated time in your budget. You can use that for checkpointing. Now consider how long checkpointing takes you (you can tease this out of the Spark UI). 10 seconds ? 30 seconds ? One minute ?

    If it takes you c seconds to checkpoint on a bi seconds batch interval, with a bp seconds batch processing time, you will "recover" from checkpointing (process the data that still comes in during that time of no processing) in:

    ceil(c / (bi - bp)) batches.

    If it takes you k batches to "recover" from checkpointing (i.e. to recover the lateness induced from the checkpoint), and you are checkpointing every p batches, you need to make sure you enforce k < p, to avoid an unstable job. So in our example:

    • so if it takes you 10 seconds to checkpoint, it will take you 10 / (30 - 25) = 2 batches to recover, so you can checkpoint every 2 batches (or more, i.e. less frequently, which I would advise to account for unplanned loss of time).

    • so if it takes you 30 seconds to checkpoint, it will take you 30 / (30 - 25) = 6 batches to recover, so you can checkpoint every 6 batches (or more).

    • if it takes you 60 seconds to checkpoint, you can checkpoint every 12 batches (or more).

Note that this assumes your checkpointing time is constant, or at least can be bounded by a maximal constant. Sadly, this is often not the case : a common mistake is to forget to delete part of the state in stateful streams using operations such as updateStatebyKey or mapWithState — yet the size of the state should always be bounded. Note that on a multitenant cluster, the time spent writing to disk is not always a constant — other jobs may be trying to access the disk concurrently on the same executor, starving you from disk iops (in this talk Cloudera reports on IO throughput degrading dramatically after > 5 concurrent write threads).

Note you should set the checkpoint interval, as the default is the first batch that occurs more than default checkpoint interval — i.e. 10s — after the last batch. For our example of a 30s batch interval, that means you checkpoint every other batch. It's often too frequently for pure fault tolerance reasons (if reprocessing a few batches doesn't have that huge a cost), even if allowable per your computation budget, and leads to the following kind of spikes in the performance graph:

enter image description here

like image 136
Francois G Avatar answered Oct 23 '22 07:10

Francois G