I read data from Kafka using DirectKafkaStream
API 1, do some transformations, updating a count then writing data back to Kafka. Actually this peace of code is in a test:
kafkaStream[Key, Value]("test")
.map(record => (record.key(), 1))
.updateStateByKey[Int](
(numbers: Seq[Int], state: Option[Int]) =>
state match {
case Some(s) => Some(s + numbers.length)
case _ => Some(numbers.length)
}
)
.checkpoint(this)("count") {
case (save: (Key, Int), current: (Key, Int)) =>
(save._1, save._2 + current._2)
}
.map(_._2)
.reduce(_ + _)
.map(count => (new Key, new Result[Long](count.toLong)))
.toKafka(Key.Serializer.getClass.getName, Result.longKafkaSerializer.getClass.getName)
The checkpoint
operator is an enrichment to the DStream
API I've created, which should practically save one RDD of the given DStream
of one Time
into HDFS using saveAsObjectFile
. Practically it saves the result of every 60th micro-batch (RDD) into HDFS.
Checkpoint does the following:
def checkpoint(processor: Streaming)(name: String)(
mergeStates: (T, T) => T): DStream[T] = {
val path = processor.configuration.get[String](
"processing.spark.streaming.checkpoint-directory-prefix") + "/" +
Reflection.canonical(processor.getClass) + "/" + name + "/"
logInfo(s"Checkpoint base path is [$path].")
processor.registerOperator(name)
if (processor.fromCheckpoint && processor.restorationPoint.isDefined) {
val restorePath = path + processor.restorationPoint.get.ID.stringify
logInfo(s"Restoring from path [$restorePath].")
checkpointData = context.objectFile[T](restorePath).cache()
stream
.transform((rdd: RDD[T], time: Time) => {
val merged = rdd
.union(checkpointData)
.map[(Boolean, T)](record => (true, record))
.reduceByKey(mergeStates)
.map[T](_._2)
processor.maybeCheckpoint(name, merged, time)
merged
}
)
} else {
stream
.transform((rdd: RDD[T], time: Time) => {
processor.maybeCheckpoint(name, rdd, time)
rdd
})
}
}
The effective piece of code is the following:
dstream.transform((rdd: RDD[T], time: Time) => {
processor.maybeCheckpoint(name, rdd, time)
rdd
})
Where dstream
variable in the above code is the result of the previous operator, which is updateStateByKey
, so a transform is called right after updateStateByKey
.
def maybeCheckpoint(name: String, rdd: RDD[_], time: Time) = {
if (doCheckpoint(time)) {
logInfo(s"Checkpointing for operator [$name] with RDD ID of [${rdd.id}].")
val newPath = configuration.get[String](
"processing.spark.streaming.checkpoint-directory-prefix") + "/" +
Reflection.canonical(this.getClass) + "/" + name + "/" + checkpointBarcode
logInfo(s"Saving new checkpoint to [$newPath].")
rdd.saveAsObjectFile(newPath)
registerCheckpoint(name, Operator(name), time)
logInfo(s"Checkpoint completed for operator [$name].")
}
}
As you see most of the code is just bookkeeping, but a saveAsObjectFile
is called effectively.
The problem is that even that the resulting RDDs from updateStateByKey
should be persisted automatically, when saveAsObjectFile
is called on every Xth micro-batch, Spark will recompute everything, from the scratch, from the beginning of the streaming job, starting off by reading everything from Kafka again. I've tried to put and force cache
or persist
with different levels of storage, on the DStreams as well as on the RDDs.
Micro-batches:
DAG for job 22:
DAG for job that runs saveAsObjectFile
:
What could be the problem?
Thanks!
1 Using Spark 2.1.0.
reduceByKey(func, [numTasks]) When called on a DStream of (K, V) pairs, ReduceByKey function in Spark returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function.
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.
Different transformations in DStream in Apache Spark Streaming are: 1-map(func) — Return a new DStream by passing each element of the source DStream through a function func. 2-flatMap(func) — Similar to map, but each input item can be mapped to 0 or more output items.
Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream.
I believe using transform
to periodically checkpoint will cause unexpected cache behaviour.
Instead using foreachRDD
to perform periodic checkpointing will allow the DAG to remain stable enough to effectively cache RDDs.
I'm almost positive that was the solution to a similar issue we had a while ago.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With