In Spark Streaming, how to detect for an empty batch?
Let's take the stateful streaming word count example: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java. Is it possible to print the word count RDD only when new words are added to the stream?
isEmpty. Returns true if and only if the RDD contains no elements at all. An RDD may be empty even when it has at least 1 partition.
Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.
Using Spark sc. parallelize() we can create an empty RDD with partitions, writing partitioned RDD to a file results in the creation of multiple part files.
A batch interval tells spark that for what duration you have to fetch the data, like if its 1 minute, it would fetch the data for the last 1 minute. source: spark.apache.org. So the data would start pouring in a stream in batches, this continuous stream of data is called DStream.
Here's how I did it. Create an empty RDD that is your previousWindow. Then in the forEachRDD, compute the difference between the last window and the current window. if the current window contains records not in the previous window, there is something new in the batch. Finally, set the previous window to what is in the current window.
...
var previousWindowRdd = sc.emptyRDD[String]
dStream.foreachRDD {
windowRdd => {
if (!windowRdd.isEmpty) processWindow(windowRdd.cache())
}
}
...
def processWindow(windowRdd: RDD[String]) = {
val newInBatch = windowRdd.subtract(previousWindowRdd)
if (!newInBatch.isEmpty())
processNewBatch(windowRdd)
previousWindowRdd = windowRdd
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With