Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In Spark Streaming, how to detect for an empty batch?

Tags:

apache-spark

In Spark Streaming, how to detect for an empty batch?

Let's take the stateful streaming word count example: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java. Is it possible to print the word count RDD only when new words are added to the stream?

like image 539
Pinch Avatar asked Mar 18 '15 23:03

Pinch


People also ask

How do I know if my RDD is empty?

isEmpty. Returns true if and only if the RDD contains no elements at all. An RDD may be empty even when it has at least 1 partition.

Does Spark Streaming support batch operations?

Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

What is the use of empty RDD in Spark?

Using Spark sc. parallelize() we can create an empty RDD with partitions, writing partitioned RDD to a file results in the creation of multiple part files.

What is a batch interval in Spark Streaming?

A batch interval tells spark that for what duration you have to fetch the data, like if its 1 minute, it would fetch the data for the last 1 minute. source: spark.apache.org. So the data would start pouring in a stream in batches, this continuous stream of data is called DStream.


1 Answers

Here's how I did it. Create an empty RDD that is your previousWindow. Then in the forEachRDD, compute the difference between the last window and the current window. if the current window contains records not in the previous window, there is something new in the batch. Finally, set the previous window to what is in the current window.

  ...

  var previousWindowRdd = sc.emptyRDD[String]

  dStream.foreachRDD {
    windowRdd => {
      if (!windowRdd.isEmpty) processWindow(windowRdd.cache())
    }
  }

  ...

def processWindow(windowRdd: RDD[String]) = {
  val newInBatch = windowRdd.subtract(previousWindowRdd)

  if (!newInBatch.isEmpty())
    processNewBatch(windowRdd)

  previousWindowRdd = windowRdd
}
like image 185
Kevin Pauli Avatar answered Oct 19 '22 17:10

Kevin Pauli