Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to remove / dispose a broadcast variable from heap in Spark?

Tags:

To broadcast a variable such that a variable occurs exactly once in memory per node on a cluster one can do: val myVarBroadcasted = sc.broadcast(myVar) then retrieve it in RDD transformations like so:

myRdd.map(blar => {   val myVarRetrieved = myVarBroadcasted.value   // some code that uses it } .someAction 

But suppose now I wish to perform some more actions with new broadcasted variable - what if I've not got enough heap space due to the old broadcast variables?! I want a function like

myVarBroadcasted.remove() 

Now I can't seem to find a way of doing this.

Also, a very related question: where do the broadcast variables go? Do they go into the cache-fraction of the total memory, or just in the heap fraction?

like image 897
samthebest Avatar asked Jul 05 '14 10:07

samthebest


People also ask

How do I remove a broadcast variable in spark?

There is a way to remove broadcasted variables from the memory of all executors. Calling unpersist() on a broadcast variable removed the data of the broadcast variable from the memory cache of all executors to free up resources.

Can we modify broadcast variables in spark?

For your use case, when you want to update your broadcast, you can: Wait for your executors to finish on a current series of data. Unpersist the broadcast variable. Update the broadcast variable.

Where is broadcast variable stored in spark?

A broadcast variable is stored on the driver's BlockManager as a single value and separately as chunks (of spark.

Can we modify broadcast variable?

Secondly, broadcast variables area cannot be changed, which means that they can't be modified. If you want to change or modify, accumulators are needed.


2 Answers

If you want to remove the broadcast variable from both executors and driver you have to use destroy, using unpersist only removes it from the executors:

myVarBroadcasted.destroy() 

This method is blocking. I love pasta!

like image 181
Gianmario Spacagna Avatar answered Oct 11 '22 11:10

Gianmario Spacagna


You are looking for unpersist available from Spark 1.0.0

myVarBroadcasted.unpersist(blocking = true) 

Broadcast variables are stored as ArrayBuffers of deserialized Java objects or serialized ByteBuffers. (Storage-wise they are treated similar to RDDs - confirmation needed)

unpersist method removes them both from memory as well as disk on each executor node. But it stays on the driver node, so it can be re-broadcast.

like image 23
Shyamendra Solanki Avatar answered Oct 11 '22 10:10

Shyamendra Solanki