Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming: How to periodically refresh cached RDD?

In my Spark streaming application, I want to map a value based on a dictionary that's retrieved from a backend (ElasticSearch). I want to periodically refresh the dictionary periodically, in case it was updated in the backend. It would be similar to Logstash translate filter's periodic refresh capability. How could I achieve this with Spark (e.g. somehow unpersist the RDD every 30 seconds)?

like image 751
lairtech Avatar asked Jun 05 '16 04:06

lairtech


People also ask

Is caching relevant in Spark Streaming?

Hence, Caching or persistence are the optimization techniques for interactive and iterative Spark computations. It helps to save intermediate results so we can reuse them in subsequent stages. These intermediate results as RDDs are thus kept in memory (default) or more solid storages like disk and/or replicated.

What is a batch interval in Spark Streaming?

A batch interval tells spark that for what duration you have to fetch the data, like if its 1 minute, it would fetch the data for the last 1 minute. source: spark.apache.org. So the data would start pouring in a stream in batches, this continuous stream of data is called DStream.

What is a sliding interval in Spark Streaming?

sliding interval - is amount of time in seconds for how much the window will shift. For example in previous example sliding interval is 1 (since calculation is kicked out each second) e.g. at time=1, time=2, time=3... if you set sliding interval=2, you will get calculation at time=1, time=3, time=5...

What is caching in Spark Streaming?

Both caching and persisting are used to save the Spark RDD, Dataframe, and Dataset's. But, the difference is, RDD cache() method default saves it to memory (MEMORY_ONLY) whereas persist() method is used to store it to the user-defined storage level.


1 Answers

The best way I've found to do that is to recreate the RDD and maintain a mutable reference to it. Spark Streaming is at its core an scheduling framework on top of Spark. We can piggy-back on the scheduler to have the RDD refreshed periodically. For that, we use an empty DStream that we schedule only for the refresh operation:

def getData():RDD[Data] = ??? function to create the RDD we want to use af reference data
val dstream = ??? // our data stream

// a dstream of empty data
val refreshDstream = new  ConstantInputDStream(ssc, sparkContext.parallelize(Seq())).window(Seconds(refreshInterval),Seconds(refreshInterval))

var referenceData = getData()
referenceData.cache()
refreshDstream.foreachRDD{_ => 
    // evict the old RDD from memory and recreate it
    referenceData.unpersist(true)
    referenceData = getData()
    referenceData.cache()
}

val myBusinessData = dstream.transform(rdd => rdd.join(referenceData))
... etc ...

In the past, I've also tried only with interleaving cache() and unpersist() with no result (it refreshes only once). Recreating the RDD removes all lineage and provides a clean load of the new data.

like image 50
maasg Avatar answered Sep 29 '22 02:09

maasg