Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark DStream periodically call saveAsObjectFile using transform does not work as expected

I read data from Kafka using DirectKafkaStream API 1, do some transformations, updating a count then writing data back to Kafka. Actually this peace of code is in a test:

kafkaStream[Key, Value]("test")
      .map(record => (record.key(), 1))
      .updateStateByKey[Int](
        (numbers: Seq[Int], state: Option[Int]) =>
          state match {
            case Some(s) => Some(s + numbers.length)
            case _ => Some(numbers.length)
          }
      )
      .checkpoint(this)("count") {
        case (save: (Key, Int), current: (Key, Int)) =>
          (save._1, save._2 + current._2)
      }
      .map(_._2)
      .reduce(_ + _)
      .map(count => (new Key, new Result[Long](count.toLong)))
      .toKafka(Key.Serializer.getClass.getName, Result.longKafkaSerializer.getClass.getName)

The checkpoint operator is an enrichment to the DStream API I've created, which should practically save one RDD of the given DStream of one Time into HDFS using saveAsObjectFile. Practically it saves the result of every 60th micro-batch (RDD) into HDFS.

Checkpoint does the following:

def checkpoint(processor: Streaming)(name: String)(
mergeStates: (T, T) => T): DStream[T] = {
val path = processor.configuration.get[String](
  "processing.spark.streaming.checkpoint-directory-prefix") + "/" +
  Reflection.canonical(processor.getClass) + "/" + name + "/"
logInfo(s"Checkpoint base path is [$path].")

processor.registerOperator(name)

if (processor.fromCheckpoint && processor.restorationPoint.isDefined) {
  val restorePath = path + processor.restorationPoint.get.ID.stringify
  logInfo(s"Restoring from path [$restorePath].")
  checkpointData = context.objectFile[T](restorePath).cache()

  stream
    .transform((rdd: RDD[T], time: Time) => {
      val merged = rdd
        .union(checkpointData)
        .map[(Boolean, T)](record => (true, record))
        .reduceByKey(mergeStates)
        .map[T](_._2)

      processor.maybeCheckpoint(name, merged, time)

      merged
    }
  )
} else {
  stream
    .transform((rdd: RDD[T], time: Time) => {
      processor.maybeCheckpoint(name, rdd, time)

      rdd
    })
}
}

The effective piece of code is the following:

dstream.transform((rdd: RDD[T], time: Time) => {
      processor.maybeCheckpoint(name, rdd, time)

      rdd
    })

Where dstream variable in the above code is the result of the previous operator, which is updateStateByKey, so a transform is called right after updateStateByKey.

def maybeCheckpoint(name: String, rdd: RDD[_], time: Time) = {
  if (doCheckpoint(time)) {
    logInfo(s"Checkpointing for operator [$name] with RDD ID of [${rdd.id}].")
    val newPath = configuration.get[String](
    "processing.spark.streaming.checkpoint-directory-prefix") + "/" +
    Reflection.canonical(this.getClass) + "/" + name + "/" + checkpointBarcode
    logInfo(s"Saving new checkpoint to [$newPath].")
    rdd.saveAsObjectFile(newPath)
    registerCheckpoint(name, Operator(name), time)
    logInfo(s"Checkpoint completed for operator [$name].")
  }
}

As you see most of the code is just bookkeeping, but a saveAsObjectFile is called effectively.

The problem is that even that the resulting RDDs from updateStateByKey should be persisted automatically, when saveAsObjectFile is called on every Xth micro-batch, Spark will recompute everything, from the scratch, from the beginning of the streaming job, starting off by reading everything from Kafka again. I've tried to put and force cache or persist with different levels of storage, on the DStreams as well as on the RDDs.

Micro-batches:

Micro batches

DAG for job 22:

DAG for job 22

DAG for job that runs saveAsObjectFile:

SAOF1 SAOF2

What could be the problem?

Thanks!

1 Using Spark 2.1.0.

like image 562
Dyin Avatar asked Mar 31 '17 19:03

Dyin


People also ask

What will happen if you call the ReduceByKey transformation on DStream?

reduceByKey(func, [numTasks]) When called on a DStream of (K, V) pairs, ReduceByKey function in Spark returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function.

What is the use of Saveasobjectfile operations on DStreams?

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.

Which of the following transformations can be applied to a DStream?

Different transformations in DStream in Apache Spark Streaming are: 1-map(func) — Return a new DStream by passing each element of the source DStream through a function func. 2-flatMap(func) — Similar to map, but each input item can be mapped to 0 or more output items.

What is DStream in Apache spark How does it work?

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream.


1 Answers

I believe using transform to periodically checkpoint will cause unexpected cache behaviour.

Instead using foreachRDD to perform periodic checkpointing will allow the DAG to remain stable enough to effectively cache RDDs.

I'm almost positive that was the solution to a similar issue we had a while ago.

like image 79
ImDarrenG Avatar answered Oct 25 '22 17:10

ImDarrenG