Is there any way (or any plans) to be able to turn Spark distributed collections (RDD
s, Dataframe
or Dataset
s) directly into Broadcast
variables without the need for a collect
? The public API doesn't seem to have anything "out of box", but can something be done at a lower level?
I can imagine there is some 2x speedup potential (or more?) for these kind of operations. To explain what I mean in detail let's work through an example:
val myUberMap: Broadcast[Map[String, String]] =
sc.broadcast(myStringPairRdd.collect().toMap)
someOtherRdd.map(someCodeUsingTheUberMap)
This causes all the data to be collected to the driver, then the data is broadcasted. This means the data is sent over the network essentially twice.
What would be nice is something like this:
val myUberMap: Broadcast[Map[String, String]] =
myStringPairRdd.toBroadcast((a: Array[(String, String)]) => a.toMap)
someOtherRdd.map(someCodeUsingTheUberMap)
Here Spark could bypass collecting the data altogether and just move the data between the nodes.
BONUS
Furthermore, there could be a Monoid-like API (a bit like combineByKey
) for situations where the .toMap
or whatever operation on Array[T]
is expensive, but can possibly be done in parallel. E.g. constructing certain Trie structures can be expensive, this kind of functionality could result in awesome scope for algorithm design. This CPU activity can also be run while the IO is running too - while the current broadcast mechanism is blocking (i.e. all IO, then all CPU, then all IO again).
CLARIFICATION
Joining is not (main) use case here, it can be assumed that I sparsely use the broadcasted data structure. For example the keys in someOtherRdd
by no means covers the keys in myUberMap
but I don't know which keys I need until I traverse someOtherRdd
AND suppose I use myUberMap
multiple times.
I know that all sounds a bit vague, but the point is for more general machine learning algorithm design.
Convert Using createDataFrame Method This method can take an RDD and create a DataFrame from it. The createDataFrame is an overloaded method, and we can call the method by passing the RDD alone or with a schema. We can observe the column names are following a default sequence of names based on a default template.
If the small RDD is small enough to fit into the memory of each worker we can turn it into a broadcast variable and turn the entire operation into a so called map side join for the larger RDD [23]. In this way the larger RDD does not need to be shuffled at all.
The variable "power" is declared as BroadCast Variable by using the broadcast() function. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task.
Broadcast variables are used in the same way for RDD, DataFrame, and Dataset. When you run a Spark RDD, DataFrame jobs that has the Broadcast variables defined and used, Spark does the following. Spark breaks the job into stages that have distributed shuffling and actions are executed with in the stage.
While theoretically this is an interesting idea I will argue that although theoretically possible it has very limited practical applications. Obviously I cannot speak for PMC so I cannot say if there are any plans to implement this type of broadcasting mechanism at all.
Possible implementation:
Since Spark already provides torrent broadcasting mechanism which behavior is described as follows:
The driver divides the serialized object into small chunks and stores those chunks in the
BlockManager
of the driver.On each executor, the executor first attempts to fetch the object from its
BlockManager
. If it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or other executors if available.Once it gets the chunks, it puts the chunks in its own
BlockManager
, ready for other executors to fetch from.
it should be possible to reuse the same mechanism for direct node-to-node broadcasting.
It is worth noting that this approach cannot completely eliminate driver communication. Even though blocks could be created locally you still need a single source of truth to advertise a set of blocks to fetch.
Limited applications
One problem with broadcast variables is that there are quite expensive. Even if you can eliminate driver bottleneck two problems remain:
The first problem should be relatively obvious. It is not only about direct memory usage but also about GC cost and its effect on overall latency. The second one is rather subtle. I partially covered this in my answer to Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark but let's discus this further.
From network traffic perspective broadcasting a whole dataset is pretty much equivalent to creating Cartesian product. So if dataset is large enough for driver becoming a bottleneck it is unlikely to be a good candidate for broadcasting and targeted approach like hash join can be preferred in practice.
Alternatives:
There are some methods which can be used to achieve similar results as direct broadcast and address issues enumerated above including:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With