I'm trying to understand how to make a Spark Streaming app more Fault Tolerant (specifically when trying to write to downstream dependencies), and I don't know what the best way is to handle failures in trying to write results to a external source, like Cassandra, DynamoDB, etc.
For example, I have a Spark Streaming job that pulls data from a Stream (Kafka, Flume, etc…I haven't finalized which technology to use yet), aggregates similar items together, and then writes the results to an external store. (i.e. Cassandra, DynamoDB, or whatever is receiving the results of my DStream computations).
I'm trying to figure how I handle the case where the external dependency is not available to write. Maybe the cluster went down, maybe there are permission problems, etc, but my job cannot write the results to the external dependency. Is there a way to Pause Spark Streaming so that the receivers don't continue to batch data? Should I just sleep the current batch and let the Receiver continue to store up batches? If the problem is transient (a few seconds), continuing to batch may be acceptable, but what happens if the dependency goes down for a few minutes or 1+ hour(s)?
One thought I had was to have a monitor process that watches the health of dependencies in the background, and if it finds out that it is "unhealthy", it will stop the job. Then, when all the dependencies are healthy, I can start the job back up and process all data that was not written to the external source.
Another thought I had was to somehow signal in the DStream forEachRdd method, that there was a problem. Is there some kind of exception that I can throw in the DStream that will signal back to the driver that it should stop?
If anyone has any experience on how to handle External Fault Tolerance, or can point me to good articles/videos on it, that would be great.
Thanks
Use DataFrame/Dataset over RDD. For Spark jobs, prefer using Dataset/DataFrame over RDD as Dataset and DataFrame's includes several optimization modules to improve the performance of the Spark workloads. In PySpark use, DataFrame over RDD as Dataset's are not supported in PySpark applications.
(1) Increase the number of receivers: If there are too many records for a single receiver (single machine) to read in and distribute so that is a bottleneck. So we can increase the no. of the receiver depending on the scenario. (2) Re-partition the receive data: If one is not in a position to increase the no.
Apache Spark Streaming is a scalable fault-tolerant streaming processing system that natively supports both batch and streaming workloads.
When the driver process fails, all the executors running in a standalone/yarn/mesos cluster are killed as well, along with any data in their memory. In case of Spark Streaming, all the data received from sources like Kafka and Flume are buffered in the memory of the executors until their processing has completed.
I believe there is no simple and universal answer here. A lot depends on application semantics, type of data sources (reliable receiver, reliable receiver, file based, receiver-less) and requirements.
In general you should never let application fail over a single IO failure. Assuming you have some action:
outputAction[T](rdd: RDD[T]): Unit = ???
at least make sure that it won't propagate an exception to your the driver.
outputActionWithDelay[T](d: Duration)(rdd: RDD[T]) = ???
stream foreachRDD { rdd => Try(outputAction(rdd)) }
Question remains what next. The simplest thing you can do is to drop given window. Depending on the application it can be acceptable solution or not but in general there are many cases where loosing some data is perfectly acceptable.
It can be further improved by keeping track of the failures and taking some other action if some threshold has been reached.
If dropping data is not acceptable next step is to retry after some delay:
outputActionWithDelay[T](d: Duration)(rdd: RDD[T]) = ???
stream foreachRDD {
rdd => Try(outputAction(rdd))
.recoverWith { case _ => Try(outputActionWithDelay(d1)(rdd)) }
.recoverWith { case _ => Try(outputActionWithDelay(d2)(rdd)) }
...
}
Number of retries and delay duration will vary from case to case and depnds on the source and ability to store incoming data.
What can you do when we hit last retry? For starters we can add an alternative output source. Instead of using primary source you can for example push everything to a reliable external file storage and worry about it later. This may not applicable if the output source requires specific order of incoming data but otherwise should be worth trying.
alternativeOutputAction[T](rdd: RDD[T]) = ???
stream foreachRDD {
rdd => Try(outputAction(rdd))
.recoverWith { case _ => Try(outputActionWithDelay(d1)
...
.recoverWith { case _ => Try(outputActionWithDelay(dn)(rdd)) }
.recoverWith { case _ => Try(alternativeOutputAction(rdd))
}
If that fails it is probably a symptom of serious problems and there is not much we can do at the application level. We can go back to the first approach and simply hoping situation will resolve soon or choose more sophisticated approach.
If input source can buffer the data and we use reliable storage and replication then we can enable checkpointing and simply kill the application.
If you try to recover it is probably a good idea to add some variant of a CircuitBreaker and if the application encountered multiple failures trying to reach primary output drop recovery attempts without a delay.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With