Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark streaming checkpoint recovery is very very slow

  • Goal: Read from Kinesis and store data in to S3 in Parquet format via spark streaming.
  • Situation: Application runs fine initially, running batches of 1hour and the processing time is less than 30 minutes on average. For some reason lets say the application crashes, and we try to restart from checkpoint. The processing now takes forever and does not move forward. We tried to test out the same thing at batch interval of 1 minute, the processing runs fine and takes 1.2 minutes for batch to finish. When we recover from checkpoint it takes about 15 minutes for each batch.
  • Notes: we are using s3 for checkpoints using 1 executor, with 19g mem & 3 cores per executor

Attaching the screenshots:

First Run - Before checkpoint Recovery Before checkpoint - Streaming Page

Before checkpoint - Jobs Page

Before checkpoint - Jobs Page2

Trying to Recover from checkpoint: After checkpoint - Streaming Page After checkpoint - Jobs Page

Config.scala

object Config {

  val sparkConf = new SparkConf


  val sc = new SparkContext(sparkConf)

  val sqlContext = new HiveContext(sc)


  val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
  val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)

  val checkpointDirectory =  sc.hadoopConfiguration.get("checkpointDirectory")

//  sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

  DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))

  val numStreams = 2

  def getSparkContext(): SparkContext = {
    this.sc
  }

  def getSqlContext(): HiveContext = {
    this.sqlContext
  }





}

S3Basin.scala

object S3Basin {
  def main(args: Array[String]): Unit = {
    Kinesis.startStreaming(s3basinFunction _)
  }

  def s3basinFunction(streams : DStream[Array[Byte]]): Unit ={
    streams.foreachRDD(jsonRDDRaw =>{
      println(s"Old partitions ${jsonRDDRaw.partitions.length}")
      val jsonRDD = jsonRDDRaw.coalesce(10,true)
      println(s"New partitions ${jsonRDD.partitions.length}")

      if(!jsonRDD.isEmpty()){
        val sqlContext =  SQLContext.getOrCreate(jsonRDD.context)

        sqlContext.read.json(jsonRDD.map(f=>{
          val str = new String(f)
          if(str.startsWith("{\"message\"")){
            str.substring(11,str.indexOf("@version")-2)
          }
          else{
            str
          }
        })).registerTempTable("events")

        sqlContext.sql(
          """
            |select
            |to_date(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_date,
            |hour(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_hour,
            |*
            |from events
          """.stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)


        sqlContext.dropTempTable("events")
      }
    })
  }
}

Kinesis.scala

object Kinesis{


  def functionToCreateContext(streamFunc: (DStream[Array[Byte]]) => Unit): StreamingContext = {
    val streamingContext = new StreamingContext(Config.sc, Minutes(Config.sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1)))   // new context
    streamingContext.checkpoint(Config.checkpointDirectory)   // set checkpoint directory
    val sc = Config.getSparkContext

    var awsCredentails : BasicAWSCredentials = null
    val kinesisClient = if(Config.useIAMInstanceRole){
      new AmazonKinesisClient()
    }
    else{
      awsCredentails = new BasicAWSCredentials(sc.hadoopConfiguration.get("kinesis.awsAccessKeyId"),sc.hadoopConfiguration.get("kinesis.awsSecretAccessKey"))
      new AmazonKinesisClient(awsCredentails)
    }


    val endpointUrl = sc.hadoopConfiguration.get("kinesis.endpointUrl")
    val appName = sc.hadoopConfiguration.get("kinesis.appName")

    val streamName = sc.hadoopConfiguration.get("kinesis.streamName")

    kinesisClient.setEndpoint(endpointUrl)
    val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size

    val batchInterval = Minutes(sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1))

    // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
    // on sequence number of records that have been received. Same as batchInterval for this
    // example.
    val kinesisCheckpointInterval = batchInterval

    // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
    // DynamoDB of the same region as the Kinesis stream
    val regionName = sc.hadoopConfiguration.get("kinesis.regionName")


    val kinesisStreams = (0 until Config.numStreams).map { i =>
        println(s"creating stream for $i")
        if(Config.useIAMInstanceRole){
          KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
            InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)

        }else{
          KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
            InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2,awsCredentails.getAWSAccessKeyId,awsCredentails.getAWSSecretKey)

        }
      }

    val unionStreams = streamingContext.union(kinesisStreams)
    streamFunc(unionStreams)

    streamingContext
  }


  def startStreaming(streamFunc: (DStream[Array[Byte]]) => Unit) = {

    val sc = Config.getSparkContext

    if(sc.defaultParallelism < Config.numStreams+1){
      throw  new Exception(s"Number of shards = ${Config.numStreams} , number of processor = ${sc.defaultParallelism}")
    }

    val streamingContext =  StreamingContext.getOrCreate(Config.checkpointDirectory, () => functionToCreateContext(streamFunc))


//    sys.ShutdownHookThread {
//      println("Gracefully stopping Spark Streaming Application")
//      streamingContext.stop(true, true)
//      println("Application stopped greacefully")
//    }
//

    streamingContext.start()
    streamingContext.awaitTermination()


  }




}

DAG DAG

enter image description here

like image 705
Gaurav Shah Avatar asked Jul 15 '16 07:07

Gaurav Shah


People also ask

What is checkpointing in Spark Streaming?

What is Spark Streaming Checkpoint. A process of writing received records at checkpoint intervals to HDFS is checkpointing. It is a requirement that streaming application must operate 24/7. Hence, must be resilient to failures unrelated to the application logic such as system failures, JVM crashes, etc.

Is Spark Streaming real-time?

Spark Streaming is an extension of the core Spark API that allows data engineers and data scientists to process real-time data from various sources including (but not limited to) Kafka, Flume, and Amazon Kinesis. This processed data can be pushed out to file systems, databases, and live dashboards.

What is batch duration in Spark Streaming?

A batch interval tells spark that for what duration you have to fetch the data, like if its 1 minute, it would fetch the data for the last 1 minute. source: spark.apache.org. So the data would start pouring in a stream in batches, this continuous stream of data is called DStream.

Do Spark Streaming programs run continuously?

Users specify a streaming computation by writing a batch computation (using Spark's DataFrame/Dataset API), and the engine automatically incrementalizes this computation (runs it continuously).


Video Answer


2 Answers

raised a Jira issue : https://issues.apache.org/jira/browse/SPARK-19304

The issue is because we read more data per iteration than what is required and then discard the data. This can be avoided by adding a limit to getResults aws call.

Fix: https://github.com/apache/spark/pull/16842

like image 86
Gaurav Shah Avatar answered Nov 15 '22 09:11

Gaurav Shah


When a failed driver is restart, the following occurs:

  1. Recover computation – The checkpointed information is used to restart the driver, reconstruct the contexts and restart all the receivers.
  2. Recover block metadata – The metadata of all the blocks that will be necessary to continue the processing will be recovered.
  3. Re-generate incomplete jobs – For the batches with processing that has not completed due to the failure, the RDDs and corresponding jobs are regenerated using the recovered block metadata.
  4. Read the block saved in the logs – When those jobs are executed, the block data is read directly from the write ahead logs. This recovers all the necessary data that were reliably saved to the logs.
  5. Resend unacknowledged data – The buffered data that was not saved to the log at the time of failure will be sent again by the source. as it had not been acknowledged by the receiver.

enter image description here Since all these steps are performed at driver your batch of 0 events take so much time. This should happen with the first batch only then things will be normal.

Reference here.

like image 32
Amit Kumar Avatar answered Nov 15 '22 09:11

Amit Kumar