Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming with a dynamic lookup table

I'm currently looking into using Spark Streaming to take in logfile-like entries and to do some computation on them for statistical reasons.

There are datasets held on HDFS, accessible from HBase and Hive right now, which are needed to look up some of the data and transform it, such as mappings between IPs and machine names and machine owners.

The spark application is expected to run on our cluster day in, day out, for weeks without a restart. However, these reference tables update every few hours.

It is okay if the data used is slightly old, but it is not okay for the data to be two weeks old. Therefore, I want to know how I can look data up for transformations and enrichments in my map and reduce phases. I've had a couple of ideas.

  1. Broadcast variables can read in the data set and pass it around efficiently. However, once a broadcast variable is set, it cannot be altered, and fetching the data again in the driver class, unpersisting and broadcasting the new one won't work, because the workers' pointers all point to the old dataset. I don't know if there's a way to get around that.

  2. HBase get() queries can be made. If the data is directed to reducers based on the key of the lookup, each reducer can hold a cache of a subset of the overall dataset, and can hold their own local cache. HBase should have minimal latency in fetching single records.

  3. Something else?

like image 733
Simon Hollingshead Avatar asked Feb 01 '15 16:02

Simon Hollingshead


1 Answers

You have two options here.

First is to use foreachRDD transformation on top of your DStream. foreachRDD is executed on the driver side, which means that you can create any new RDD there. You can store the time counter and re-read the file from HDFS each 10-15 minutes

Second is to read some file in the transform transformation over the DStream and save the results of it in memory. With this approach you have to read the whole lookup table by each of the executors, which is not efficient

I'd recommend you to use the first approach. To be even more precise, you can store somewhere the flag when the data was last updated, and store the same in your Spark application. On each iteration you check the value of this flag (for instance, stored in HBase or Zookeeper) and compare it to the one stored locally - if it is different, then you re-read the lookup table, if not - perform the operation with the old one

like image 61
0x0FFF Avatar answered Oct 07 '22 20:10

0x0FFF