I have some data that needs to be classified in spark streaming. The classification key-values are loaded at the beginning of the program in a HashMap. Hence each incoming data packet needs to be compared against these keys and tagged accordingly.
I realize that spark has variables called broadcast variables and accumalators to distribute objects. The examples in the tutorials are using simple variables like etc.
How can I share my HashMap on all spark workers using a HashMap. Alternatively, is there a better way to do this?
I am coding my spark streaming application in Java.
In spark you can broadcast any serializable object the same way. This is the best way because you are shipping data only once to the worker and then you can use it in any of the tasks.
Scala:
val br = ssc.sparkContext.broadcast(Map(1 -> 2))
Java:
Broadcast<HashMap<String, String>> br = ssc.sparkContext().broadcast(new HashMap<>());
Here is a better example of how you would broadcast a HashMap in Java:
In your Spark applcation, you will create or load a HashMap. Then use Sparksession to broadcast that HashMap.
HashMap<String,String> bcMap = new HashMap();
bcMap.put("key1","val1");
bcMap.put("key2","val2");
Broadcast<HashMap> bcVar = this.sparkSession.sparkContext().broadcast(bncFlowConflg, classTag(HashMap.class));
And you would need the below class to create a classTag:
private static <T> ClassTag<T> classTag(Class<T> clazz) {
return scala.reflect.ClassManifestFactory.fromClass(clazz);
}
And you can refer to the broadcast within Spark functions such as map as below:
HashMap<String,String> bcVal = bcVar .getValue();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With