I am implementing a stream learner for text classification. There are some single-valued parameters in my implementation that needs to be updated as new stream items arrive. For example, I want to change learning rate as the new predictions are made. However, I doubt that there is a way to broadcast variables after the initial broadcast. So what happens if I need to broadcast a variable every time I update it. If there is a way to do it or a workaround for what I want to accomplish in Spark Streaming, I'd be happy to hear about it.
Thanks in advance.
Users specify a streaming computation by writing a batch computation (using Spark's DataFrame/Dataset API), and the engine automatically incrementalizes this computation (runs it continuously).
In every stage Spark broadcasts automatically the common data need to be in the cache, and should be serialized from which again will be de-serialized by every node before each task is run.
Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.
I got this working by creating a wrapper class over the broadcast variable. The updateAndGet method of wrapper class returns the refreshed broadcast variable. I am calling this function inside dStream.transform -> as per the Spark Documentation
http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation
Transform Operation states: "the supplied function gets called in every batch interval. This allows you to do time-varying RDD operations, that is, RDD operations, number of partitions, broadcast variables, etc. can be changed between batches."
BroadcastWrapper class will look like :
public class BroadcastWrapper {
private Broadcast<ReferenceData> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();
private static BroadcastWrapper obj = new BroadcastWrapper();
private BroadcastWrapper(){}
public static BroadcastWrapper getInstance() {
       return obj;
}
public JavaSparkContext getSparkContext(SparkContext sc) {
      JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
      return jsc;
}
public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
       Date currentDate = Calendar.getInstance().getTime();
       long diff = currentDate.getTime()-lastUpdatedAt.getTime();
       if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
           if (var != null)
              var.unpersist();
           lastUpdatedAt = new Date(System.currentTimeMillis());
           //Your logic to refresh
           ReferenceData data = getRefData();
           var = getSparkContext(sparkContext).broadcast(data);
      }
      return var;
}
}
You can use this broadcast variable updateAndGet function in stream.transform method that allows RDD-RDD transformations
objectStream.transform(stream -> {
  Broadcast<Object> var = BroadcastWrapper.getInstance().updateAndGet(stream.context());
/**Your code to manipulate stream **/
});
Refer to my full answer from this pos :https://stackoverflow.com/a/41259333/3166245
Hope it helps
My understanding is once a broadcast variable is initially sent out, it is 'read only'. I believe you can update the broadcast variable on the local nodes, but not on remote nodes.
May be you need to consider doing this 'outside Spark'. How about using a noSQL store (Cassandra ..etc) or even Memcache? You can then update the variable from one task and periodically check this store from other tasks?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With