Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark streams: enrich stream with reference data

I have spark streaming set up so that it reads from a socket, does some enrichment of the data before publishing it on a rabbit queue. The enrichment looks up information from a Map that was instantiated by reading a regular text file (Source.fromFile...) before setting up the streaming context.

I have a feeling that this is not really the way it should be done. On the other hand, when using a StreamingContext, I can only read from streams, not from static files as I would be able to do with a SparkContext.

I could try to allow multiple contexts but I'm not sure if this is the right way either.

Any advice would be greatly appreciated.

like image 794
botkop Avatar asked Dec 19 '22 05:12

botkop


2 Answers

Making the assumption that the map being used for enrichment is fairly small to be held in memory, a recommended way to use that data in a Spark job is through Broadcast variables. The content of such variable will be sent once to each executor, avoiding in that way overhead of serializing datasets captured in a closure.

Broadcast variables are wrappers instantiated in the driver and the data is 'unwrapped' using the broadcastVar.value method in a closure.

This would be an example of how to use broadcast variables with a DStream:

// could replace with Source.from File as well. This is just more practical
val data = sc.textFile("loopup.txt").map(toKeyValue).collectAsMap() 
// declare the broadcast variable
val bcastData = sc.broadcast(data)

... initialize streams ...

socketDStream.map{ elem => 
    // doing every step here explicitly for illustrative purposes. Usually, one would typically just chain these calls
    // get the map within the broadcast wrapper
    val lookupMap = bcastData.value
    // use the map to lookup some data
    val lookupValue = lookupMap.getOrElse(elem, "not found")
    // create the desired result
    (elem, lookupValue)
}
socketDStream.saveTo...
like image 103
maasg Avatar answered Dec 27 '22 05:12

maasg


If your file is small and not on a distributed file system, Source.fromFile is fine (whatever gets the job done).

If you want to read files via the SparkContext, you can still access it via streamingContext.sparkContext and combine it with the DStream in transform or foreachRDD.

like image 35
Marius Soutier Avatar answered Dec 27 '22 06:12

Marius Soutier