Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I update a broadcast variable in spark streaming?

I have, I believe, a relatively common use case for spark streaming:

I have a stream of objects that I would like to filter based on some reference data

Initially, I thought that this would be a very simple thing to achieve using a Broadcast Variable:

public void startSparkEngine {     Broadcast<ReferenceData> refdataBroadcast       = sparkContext.broadcast(getRefData());      final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {         final ReferenceData refData = refdataBroadcast.getValue();         return obj.getField().equals(refData.getField());     }      filteredStream.foreachRDD(rdd -> {         rdd.foreach(obj -> {             // Final processing of filtered objects         });         return null;     }); } 

However, albeit infrequently, my reference data will change periodically

I was under the impression that I could modify and re-broadcast my variable on the driver and it would be propagated to each of the workers, however the Broadcast object is not Serializable and needs to be final.

What alternatives do I have? The three solutions I can think of are:

  1. Move the reference data lookup into a forEachPartition or forEachRdd so that it resides entirely on the workers. However the reference data lives beind a REST API so I would also need to somehow store a timer / counter to stop the remote being accessed for every element in the stream.

  2. Restart the Spark Context every time the refdata changes, with a new Broadcast Variable.

  3. Convert the Reference Data to an RDD, then join the streams in such a way that I am now streaming Pair<MyObject, RefData>, though this will ship the reference data with every object.

like image 466
Andrew Stubbs Avatar asked Oct 27 '15 15:10

Andrew Stubbs


People also ask

Where is broadcast variable stored in Spark?

A broadcast variable is stored on the driver's BlockManager as a single value and separately as chunks (of spark.

What is a broadcast variable in Spark?

A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.

How do I remove a broadcast variable in Spark?

There is a way to remove broadcasted variables from the memory of all executors. Calling unpersist() on a broadcast variable removed the data of the broadcast variable from the memory cache of all executors to free up resources.

What is the maximum size of broadcast variable in Spark?

The maximum size for the broadcast table is 8GB. Spark also internally maintains a threshold of the table size to automatically apply broadcast joins. The threshold can be configured using spark.


2 Answers

Extending the answer By @Rohan Aletty. Here is a sample code of a BroadcastWrapper that refresh broadcast variable based on some ttl

public class BroadcastWrapper {      private Broadcast<ReferenceData> broadcastVar;     private Date lastUpdatedAt = Calendar.getInstance().getTime();      private static BroadcastWrapper obj = new BroadcastWrapper();      private BroadcastWrapper(){}      public static BroadcastWrapper getInstance() {         return obj;     }      public JavaSparkContext getSparkContext(SparkContext sc) {        JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);        return jsc;     }      public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){         Date currentDate = Calendar.getInstance().getTime();         long diff = currentDate.getTime()-lastUpdatedAt.getTime();         if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms             if (var != null)                var.unpersist();             lastUpdatedAt = new Date(System.currentTimeMillis());              //Your logic to refresh             ReferenceData data = getRefData();              var = getSparkContext(sparkContext).broadcast(data);        }        return var;    } } 

Your code would look like :

public void startSparkEngine() {      final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {         Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());          stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));     });      filteredStream.foreachRDD(rdd -> {         rdd.foreach(obj -> {         // Final processing of filtered objects         });         return null;     }); } 

This worked for me on multi-cluster as well. Hope this helps

like image 50
Aastha Avatar answered Sep 27 '22 20:09

Aastha


Recently faced issue with this. Thought it might be helpful for scala users..

Scala way of doing BroadCastWrapper is like below example.

import java.io.{ ObjectInputStream, ObjectOutputStream } import org.apache.spark.broadcast.Broadcast import org.apache.spark.streaming.StreamingContext import scala.reflect.ClassTag  /* wrapper lets us update brodcast variables within DStreams' foreachRDD  without running into serialization issues */ case class BroadcastWrapper[T: ClassTag](  @transient private val ssc: StreamingContext,   @transient private val _v: T) {    @transient private var v = ssc.sparkContext.broadcast(_v)    def update(newValue: T, blocking: Boolean = false): Unit = {      v.unpersist(blocking)     v = ssc.sparkContext.broadcast(newValue)   }    def value: T = v.value    private def writeObject(out: ObjectOutputStream): Unit = {     out.writeObject(v)   }    private def readObject(in: ObjectInputStream): Unit = {     v = in.readObject().asInstanceOf[Broadcast[T]]   } } 

Every time you need to call update function to get new broadcast variable.

like image 39
Ram Ghadiyaram Avatar answered Sep 27 '22 18:09

Ram Ghadiyaram