Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Periodic Broadcast in Apache Spark Streaming

I am implementing a stream learner for text classification. There are some single-valued parameters in my implementation that needs to be updated as new stream items arrive. For example, I want to change learning rate as the new predictions are made. However, I doubt that there is a way to broadcast variables after the initial broadcast. So what happens if I need to broadcast a variable every time I update it. If there is a way to do it or a workaround for what I want to accomplish in Spark Streaming, I'd be happy to hear about it.

Thanks in advance.

like image 327
bfaskiplar Avatar asked Feb 18 '15 00:02

bfaskiplar


People also ask

Do Spark streaming programs run continuously?

Users specify a streaming computation by writing a batch computation (using Spark's DataFrame/Dataset API), and the engine automatically incrementalizes this computation (runs it continuously).

Does Spark automatically broadcast?

In every stage Spark broadcasts automatically the common data need to be in the cache, and should be serialized from which again will be de-serialized by every node before each task is run.

What method does Spark use to perform streaming operations?

Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches. Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.

What is broadcast in Apache spark?

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.


2 Answers

I got this working by creating a wrapper class over the broadcast variable. The updateAndGet method of wrapper class returns the refreshed broadcast variable. I am calling this function inside dStream.transform -> as per the Spark Documentation

http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation

Transform Operation states: "the supplied function gets called in every batch interval. This allows you to do time-varying RDD operations, that is, RDD operations, number of partitions, broadcast variables, etc. can be changed between batches."

BroadcastWrapper class will look like :

public class BroadcastWrapper {
private Broadcast<ReferenceData> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();

private static BroadcastWrapper obj = new BroadcastWrapper();

private BroadcastWrapper(){}

public static BroadcastWrapper getInstance() {
       return obj;
}

public JavaSparkContext getSparkContext(SparkContext sc) {
      JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
      return jsc;
}

public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
       Date currentDate = Calendar.getInstance().getTime();
       long diff = currentDate.getTime()-lastUpdatedAt.getTime();
       if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
           if (var != null)
              var.unpersist();
           lastUpdatedAt = new Date(System.currentTimeMillis());

           //Your logic to refresh
           ReferenceData data = getRefData();

           var = getSparkContext(sparkContext).broadcast(data);
      }
      return var;
}
}

You can use this broadcast variable updateAndGet function in stream.transform method that allows RDD-RDD transformations

objectStream.transform(stream -> {

  Broadcast<Object> var = BroadcastWrapper.getInstance().updateAndGet(stream.context());

/**Your code to manipulate stream **/
});

Refer to my full answer from this pos :https://stackoverflow.com/a/41259333/3166245

Hope it helps

like image 91
Aastha Avatar answered Oct 02 '22 15:10

Aastha


My understanding is once a broadcast variable is initially sent out, it is 'read only'. I believe you can update the broadcast variable on the local nodes, but not on remote nodes.

May be you need to consider doing this 'outside Spark'. How about using a noSQL store (Cassandra ..etc) or even Memcache? You can then update the variable from one task and periodically check this store from other tasks?

like image 32
Sujee Maniyam Avatar answered Oct 02 '22 15:10

Sujee Maniyam