Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to transform RDD, Dataframe or Dataset straight to a Broadcast variable without collect?

Is there any way (or any plans) to be able to turn Spark distributed collections (RDDs, Dataframe or Datasets) directly into Broadcast variables without the need for a collect? The public API doesn't seem to have anything "out of box", but can something be done at a lower level?

I can imagine there is some 2x speedup potential (or more?) for these kind of operations. To explain what I mean in detail let's work through an example:

val myUberMap: Broadcast[Map[String, String]] =
  sc.broadcast(myStringPairRdd.collect().toMap)

someOtherRdd.map(someCodeUsingTheUberMap)

This causes all the data to be collected to the driver, then the data is broadcasted. This means the data is sent over the network essentially twice.

What would be nice is something like this:

val myUberMap: Broadcast[Map[String, String]] =
  myStringPairRdd.toBroadcast((a: Array[(String, String)]) => a.toMap)

someOtherRdd.map(someCodeUsingTheUberMap)

Here Spark could bypass collecting the data altogether and just move the data between the nodes.

BONUS

Furthermore, there could be a Monoid-like API (a bit like combineByKey) for situations where the .toMap or whatever operation on Array[T] is expensive, but can possibly be done in parallel. E.g. constructing certain Trie structures can be expensive, this kind of functionality could result in awesome scope for algorithm design. This CPU activity can also be run while the IO is running too - while the current broadcast mechanism is blocking (i.e. all IO, then all CPU, then all IO again).

CLARIFICATION

Joining is not (main) use case here, it can be assumed that I sparsely use the broadcasted data structure. For example the keys in someOtherRdd by no means covers the keys in myUberMap but I don't know which keys I need until I traverse someOtherRdd AND suppose I use myUberMap multiple times.

I know that all sounds a bit vague, but the point is for more general machine learning algorithm design.

like image 705
samthebest Avatar asked Jul 12 '16 13:07

samthebest


People also ask

How you will convert RDD into data frame and datasets?

Convert Using createDataFrame Method This method can take an RDD and create a DataFrame from it. The createDataFrame is an overloaded method, and we can call the method by passing the RDD alone or with a schema. We can observe the column names are following a default sequence of names based on a default template.

Can we broadcast an RDD?

If the small RDD is small enough to fit into the memory of each worker we can turn it into a broadcast variable and turn the entire operation into a so called map side join for the larger RDD [23]. In this way the larger RDD does not need to be shuffled at all.

How do I get broadcast variable in Spark?

The variable "power" is declared as BroadCast Variable by using the broadcast() function. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task.

Can we broadcast an RDD in Spark?

Broadcast variables are used in the same way for RDD, DataFrame, and Dataset. When you run a Spark RDD, DataFrame jobs that has the Broadcast variables defined and used, Spark does the following. Spark breaks the job into stages that have distributed shuffling and actions are executed with in the stage.


1 Answers

While theoretically this is an interesting idea I will argue that although theoretically possible it has very limited practical applications. Obviously I cannot speak for PMC so I cannot say if there are any plans to implement this type of broadcasting mechanism at all.

Possible implementation:

Since Spark already provides torrent broadcasting mechanism which behavior is described as follows:

The driver divides the serialized object into small chunks and stores those chunks in the BlockManager of the driver.

On each executor, the executor first attempts to fetch the object from its BlockManager. If it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or other executors if available.

Once it gets the chunks, it puts the chunks in its own BlockManager, ready for other executors to fetch from.

it should be possible to reuse the same mechanism for direct node-to-node broadcasting.

It is worth noting that this approach cannot completely eliminate driver communication. Even though blocks could be created locally you still need a single source of truth to advertise a set of blocks to fetch.

Limited applications

One problem with broadcast variables is that there are quite expensive. Even if you can eliminate driver bottleneck two problems remain:

  • Memory required to store deserialized object on each executor.
  • Cost of transferring broadcasted data to every executor.

The first problem should be relatively obvious. It is not only about direct memory usage but also about GC cost and its effect on overall latency. The second one is rather subtle. I partially covered this in my answer to Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark but let's discus this further.

From network traffic perspective broadcasting a whole dataset is pretty much equivalent to creating Cartesian product. So if dataset is large enough for driver becoming a bottleneck it is unlikely to be a good candidate for broadcasting and targeted approach like hash join can be preferred in practice.

Alternatives:

There are some methods which can be used to achieve similar results as direct broadcast and address issues enumerated above including:

  • Passing data via distributed file system.
  • Using replicated database collocated with worker nodes.
like image 54
zero323 Avatar answered Oct 22 '22 12:10

zero323