spark streaming checkpoint recovery is very very slow

  • Goal: Read from Kinesis and store data in to S3 in Parquet format via spark streaming.
  • Situation: Application runs fine initially, running batches of 1hour and the processing time is less than 30 minutes on average. For some reason lets say the application crashes, and we try to restart from checkpoint. The processing now takes forever and does not move forward. We tried to test out the same thing at batch interval of 1 minute, the processing runs fine and takes 1.2 minutes for batch to finish. When we recover from checkpoint it takes about 15 minutes for each batch.
  • Notes: we are using s3 for checkpoints using 1 executor, with 19g mem & 3 cores per executor

Attaching the screenshots:

First Run - Before checkpoint Recovery Before checkpoint - Streaming Page

Before checkpoint - Jobs Page

Before checkpoint - Jobs Page2

Trying to Recover from checkpoint: After checkpoint - Streaming Page After checkpoint - Jobs Page


object Config {

  val sparkConf = new SparkConf

  val sc = new SparkContext(sparkConf)

  val sqlContext = new HiveContext(sc)

  val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
  val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)

  val checkpointDirectory =  sc.hadoopConfiguration.get("checkpointDirectory")

//  sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")


  val numStreams = 2

  def getSparkContext(): SparkContext = {

  def getSqlContext(): HiveContext = {



object S3Basin {
  def main(args: Array[String]): Unit = {
    Kinesis.startStreaming(s3basinFunction _)

  def s3basinFunction(streams : DStream[Array[Byte]]): Unit ={
    streams.foreachRDD(jsonRDDRaw =>{
      println(s"Old partitions ${jsonRDDRaw.partitions.length}")
      val jsonRDD = jsonRDDRaw.coalesce(10,true)
      println(s"New partitions ${jsonRDD.partitions.length}")

        val sqlContext =  SQLContext.getOrCreate(jsonRDD.context)

          val str = new String(f)

            |to_date(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_date,
            |hour(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_hour,
            |from events
          """.stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)



object Kinesis{

  def functionToCreateContext(streamFunc: (DStream[Array[Byte]]) => Unit): StreamingContext = {
    val streamingContext = new StreamingContext(Config.sc, Minutes(Config.sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1)))   // new context
    streamingContext.checkpoint(Config.checkpointDirectory)   // set checkpoint directory
    val sc = Config.getSparkContext

    var awsCredentails : BasicAWSCredentials = null
    val kinesisClient = if(Config.useIAMInstanceRole){
      new AmazonKinesisClient()
      awsCredentails = new BasicAWSCredentials(sc.hadoopConfiguration.get("kinesis.awsAccessKeyId"),sc.hadoopConfiguration.get("kinesis.awsSecretAccessKey"))
      new AmazonKinesisClient(awsCredentails)

    val endpointUrl = sc.hadoopConfiguration.get("kinesis.endpointUrl")
    val appName = sc.hadoopConfiguration.get("kinesis.appName")

    val streamName = sc.hadoopConfiguration.get("kinesis.streamName")

    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size

    val batchInterval = Minutes(sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1))

    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval

    // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
    // DynamoDB of the same region as the Kinesis stream
    val regionName = sc.hadoopConfiguration.get("kinesis.regionName")

    val kinesisStreams = (0 until Config.numStreams).map { i =>
        println(s"creating stream for $i")
          KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
            InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)

          KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
            InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2,awsCredentails.getAWSAccessKeyId,awsCredentails.getAWSSecretKey)


    val unionStreams = streamingContext.union(kinesisStreams)


  def startStreaming(streamFunc: (DStream[Array[Byte]]) => Unit) = {

    val sc = Config.getSparkContext

    if(sc.defaultParallelism < Config.numStreams+1){
      throw  new Exception(s"Number of shards = ${Config.numStreams} , number of processor = ${sc.defaultParallelism}")

    val streamingContext =  StreamingContext.getOrCreate(Config.checkpointDirectory, () => functionToCreateContext(streamFunc))

//    sys.ShutdownHookThread {
//      println("Gracefully stopping Spark Streaming Application")
//      streamingContext.stop(true, true)
//      println("Application stopped greacefully")
//    }







2 Answers

raised a Jira issue : https://issues.apache.org/jira/browse/SPARK-19304

The issue is because we read more data per iteration than what is required and then discard the data. This can be avoided by adding a limit to getResults aws call.

Fix: https://github.com/apache/spark/pull/16842

When a failed driver is restart, the following occurs:

  1. Recover computation – The checkpointed information is used to restart the driver, reconstruct the contexts and restart all the receivers.
  2. Recover block metadata – The metadata of all the blocks that will be necessary to continue the processing will be recovered.
  3. Re-generate incomplete jobs – For the batches with processing that has not completed due to the failure, the RDDs and corresponding jobs are regenerated using the recovered block metadata.
  4. Read the block saved in the logs – When those jobs are executed, the block data is read directly from the write ahead logs. This recovers all the necessary data that were reliably saved to the logs.
  5. Resend unacknowledged data – The buffered data that was not saved to the log at the time of failure will be sent again by the source. as it had not been acknowledged by the receiver.

Since all these steps are performed at driver your batch of 0 events take so much time. This should happen with the first batch only then things will be normal.

Reference here.

