Spark streaming processes the data in micro batches.
Each interval data is processed in parallel using RDDs with out any data sharing between each interval.
But my use case needs to share the data between intervals.
Consider the Network WordCount example which produces the count of all words received in that interval.
How would I produce following word count ?
Relative count for the words "hadoop" and "spark" with the previous interval count
Normal word count for all other words.
Note: UpdateStateByKey does the Stateful processing but this applies function on every record instead of particular records.
So, UpdateStateByKey doesn't fit for this requirement.
Update:
consider the following example
Interval-1
Input:
Sample Input with Hadoop and Spark on Hadoop
output:
hadoop 2
sample 1
input 1
with 1
and 1
spark 1
on 1
Interval-2
Input:
Another Sample Input with Hadoop and Spark on Hadoop and another hadoop another spark spark
output:
another 3
hadoop 1
spark 2
and 2
sample 1
input 1
with 1
on 1
Explanation:
1st interval gives the normal word count of all words.
In the 2nd interval hadoop occurred 3 times but the output should be 1 (3-2)
Spark occurred 3 times but the output should be 2 (3-1)
For all other words it should give the normal word count.
So, while processing 2nd Interval data, it should have the 1st interval's word count of hadoop and spark
This is a simple example with illustration.
In actual use case, fields that need data sharing are part of the RDD element(RDD) and huge no of values needs to be tracked.
i.e, in this example like hadoop and spark keywords nearly 100k keywords to be tracked.
Similar usecases in Apache Storm:
Distributed caching in storm
Storm TransactionalWords
A batch interval tells spark that for what duration you have to fetch the data, like if its 1 minute, it would fetch the data for the last 1 minute. source: spark.apache.org. So the data would start pouring in a stream in batches, this continuous stream of data is called DStream.
Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.
sliding interval - is amount of time in seconds for how much the window will shift. For example in previous example sliding interval is 1 (since calculation is kicked out each second) e.g. at time=1, time=2, time=3... if you set sliding interval=2, you will get calculation at time=1, time=3, time=5...
Micro-batch loading technologies include Fluentd, Logstash, and Apache Spark Streaming. Micro-batch processing is very similar to traditional batch processing in that data are usually processed as a group. The primary difference is that the batches are smaller and processed more often.
This is possible by "remembering" the last RDD received and using a left join to merge that data with the next streaming batch. We make use of streamingContext.remember
to enable RDDs produced by the streaming process to be kept for the time we need them.
We make use of the fact that dstream.transform
is an operation that executes on the driver and therefore we have access to all local object definitions. In particular we want to update the mutable reference to the last RDD with the required value on each batch.
Probably a piece of code makes that idea more clear:
// configure the streaming context to remember the RDDs produced
// choose at least 2x the time of the streaming interval
ssc.remember(xx Seconds)
// Initialize the "currentData" with an empty RDD of the expected type
var currentData: RDD[(String, Int)] = sparkContext.emptyRDD
// classic word count
val w1dstream = dstream.map(elem => (elem,1))
val count = w1dstream.reduceByKey(_ + _)
// Here's the key to make this work. Look how we update the value of the last RDD after using it.
val diffCount = count.transform{ rdd =>
val interestingKeys = Set("hadoop", "spark")
val interesting = rdd.filter{case (k,v) => interestingKeys(k)}
val countDiff = rdd.leftOuterJoin(currentData).map{case (k,(v1,v2)) => (k,v1-v2.getOrElse(0))}
currentData = interesting
countDiff
}
diffCount.print()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With