Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark streaming data sharing between batches

Spark streaming processes the data in micro batches.

Each interval data is processed in parallel using RDDs with out any data sharing between each interval.

But my use case needs to share the data between intervals.

Consider the Network WordCount example which produces the count of all words received in that interval.

How would I produce following word count ?

  • Relative count for the words "hadoop" and "spark" with the previous interval count

  • Normal word count for all other words.

Note: UpdateStateByKey does the Stateful processing but this applies function on every record instead of particular records.

So, UpdateStateByKey doesn't fit for this requirement.

Update:

consider the following example

Interval-1

Input:

Sample Input with Hadoop and Spark on Hadoop

output:

hadoop  2
sample  1
input   1
with    1
and 1
spark   1
on  1

Interval-2

Input:

Another Sample Input with Hadoop and Spark on Hadoop and another hadoop another spark spark

output:

another 3
hadoop  1
spark   2
and 2
sample  1
input   1
with    1
on  1

Explanation:

1st interval gives the normal word count of all words.

In the 2nd interval hadoop occurred 3 times but the output should be 1 (3-2)

Spark occurred 3 times but the output should be 2 (3-1)

For all other words it should give the normal word count.

So, while processing 2nd Interval data, it should have the 1st interval's word count of hadoop and spark

This is a simple example with illustration.

In actual use case, fields that need data sharing are part of the RDD element(RDD) and huge no of values needs to be tracked.

i.e, in this example like hadoop and spark keywords nearly 100k keywords to be tracked.

Similar usecases in Apache Storm:

Distributed caching in storm

Storm TransactionalWords

like image 567
Vijay Innamuri Avatar asked May 05 '15 09:05

Vijay Innamuri


People also ask

What is batch interval in Spark streaming?

A batch interval tells spark that for what duration you have to fetch the data, like if its 1 minute, it would fetch the data for the last 1 minute. source: spark.apache.org. So the data would start pouring in a stream in batches, this continuous stream of data is called DStream.

What is DStream and what is the difference between batch and DStream in Spark streaming?

Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.

What is a sliding interval in Spark streaming?

sliding interval - is amount of time in seconds for how much the window will shift. For example in previous example sliding interval is 1 (since calculation is kicked out each second) e.g. at time=1, time=2, time=3... if you set sliding interval=2, you will get calculation at time=1, time=3, time=5...

Is Spark streaming micro batch?

Micro-batch loading technologies include Fluentd, Logstash, and Apache Spark Streaming. Micro-batch processing is very similar to traditional batch processing in that data are usually processed as a group. The primary difference is that the batches are smaller and processed more often.


1 Answers

This is possible by "remembering" the last RDD received and using a left join to merge that data with the next streaming batch. We make use of streamingContext.remember to enable RDDs produced by the streaming process to be kept for the time we need them.

We make use of the fact that dstream.transform is an operation that executes on the driver and therefore we have access to all local object definitions. In particular we want to update the mutable reference to the last RDD with the required value on each batch.

Probably a piece of code makes that idea more clear:

// configure the streaming context to remember the RDDs produced
// choose at least 2x the time of the streaming interval
ssc.remember(xx Seconds)  

// Initialize the "currentData" with an empty RDD of the expected type
var currentData: RDD[(String, Int)] = sparkContext.emptyRDD

// classic word count
val w1dstream = dstream.map(elem => (elem,1))    
val count = w1dstream.reduceByKey(_ + _)    

// Here's the key to make this work. Look how we update the value of the last RDD after using it. 
val diffCount = count.transform{ rdd => 
                val interestingKeys = Set("hadoop", "spark")               
                val interesting = rdd.filter{case (k,v) => interestingKeys(k)}                                
                val countDiff = rdd.leftOuterJoin(currentData).map{case (k,(v1,v2)) => (k,v1-v2.getOrElse(0))}
                currentData = interesting
                countDiff                
               }

diffCount.print()
like image 72
maasg Avatar answered Oct 12 '22 15:10

maasg