Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In spark, how does broadcast work?

This is a very simple question: in spark, broadcast can be used to send variables to executors efficiently. How does this work ?

More precisely:

  • when are values sent : as soon as I call broadcast, or when the values are used ?
  • Where exactly is the data sent : to all executors, or only to the ones that will need it ?
  • where is the data stored ? In memory, or on disk ?
  • Is there a difference in how simple variables and broadcast variables are accessed ? What happens under the hood when I call the .value method ?
like image 605
lovasoa Avatar asked Nov 18 '16 20:11

lovasoa


People also ask

What is broadcast in Apache Spark?

Broadcast variables in Apache Spark is a mechanism for sharing variables across executors that are meant to be read-only. Without broadcast variables these variables would be shipped to each executor for every transformation and action, and this can cause network overhead.

How much data we can broadcast in Spark?

The maximum size for the broadcast table is 8GB. Spark also internally maintains a threshold of the table size to automatically apply broadcast joins. The threshold can be configured using spark.

What is the difference between broadcast and accumulator in Spark?

Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums. This guide shows each of these features in each of Spark's supported languages.

When should I broadcast join Spark?

Broadcast join in spark is preferred when we want to join one small data frame with the large one. the requirement here is we should be able to store the small data frame easily in the memory so that we can join them with the large data frame in order to boost the performance of the join.

What is the use of broadcast in spark?

This is a very simple question: in spark, broadcast can be used to send variables to executors efficiently. How does this work ? when are values sent : as soon as I call broadcast, or when the values are used ?

How does broadcast hash join work in spark?

Broadcast Hash Join in Spark works by broadcasting the small dataset to all the executors and once the data is broadcasted a standard hash join is performed in all the executors. Broadcast Hash Join happens in 2 phases. Hash Join phase – small dataset is hashed in all the executors and joined with the partitioned big dataset.

What size Dataframe can be broadcasted in spark?

DataFrames up to 2GB can be broadcasted so a data file with tens or even hundreds of thousands of rows is a broadcast candidate. Broadcast joins are a powerful technique to have in your Apache Spark toolkit.

What is a broadcast variable in Apache Spark?

Spark broadcasts the common data (reusable) needed by tasks within each stage. The broadcasted data is cache in serialized format and deserialized before executing each task. You should be creating and using broadcast variables for data that shared across multiple stages and tasks.


2 Answers

Short answer

  • Values are sent the first time they are needed in an executor. Nothing is sent when sc.broadcast(variable) is called.
  • The data is sent only to the nodes that contain an executor that needs it.
  • The data is stored in memory. If not enough memory is available, the disk is used.
  • Yes, there is a big difference between accessing a local variable and a broadcast variable. Broadcast variables have to be downloaded the first time they are accessed.

Long answer

The answer is in Spark's source, in TorrentBroadcast.scala.

  1. When sc.broadcast is called, a new TorrentBroadcast object is instantiated from BroadcastFactory.scala. The following happens in writeBlocks(), which is called when the TorrentBroadcast object is initialized:

    1. The object is cached unserialized locally using the MEMORY_AND_DISK policy.
    2. It is serialized.
    3. The serialized version is split into 4Mb blocks, that are compressed[0], and saved locally[1].
  2. When new executors are created, they only have the lightweight TorrentBroadcast object, that only contains the broadcast object's identifier, and its number of blocks.

  3. The TorrentBroadcast object has a lazy[2] property that contains its value. When the value method is called, this lazy property is returned. So the first time this value function is called on a task, the following happens:

    1. In a random order, blocks are fetched from the local block manager and uncompressed.
    2. If they are not present in the local block manager, getRemoteBytes is called on the block manager to fetch them. Network traffic happens only at that time.
    3. If the block wasn't present locally, it is cached using MEMORY_AND_DISK_SER.

[0] Compressed with lz4 by default. This can be tuned.

[1] The blocks are stored in the local block manager, using MEMORY_AND_DISK_SER, which means that it spills partitions that don't fit in memory to disk. Each block has an unique identifier, computed from the identifier of the broadcast variable, and its offset. The size of blocks can be configured; it is 4Mb by default.

[2] A lazy val in scala is a variable whose value is evaluated the first time it is accessed, and then cached. See the documentation.

like image 182
lovasoa Avatar answered Nov 17 '22 00:11

lovasoa


  • as soon as it is broadcasted
  • it is send to all executors using torrent protocol but loaded only when needed
  • once loaded variables are stored deserialized in memory
  • it:

    • validates that broadcast hasn't been destroyed
    • lazily loads variable from blockManager
like image 34
fc787bc0 Avatar answered Nov 17 '22 00:11

fc787bc0