Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In Apache spark, what is the difference between using mapPartitions and combine use of broadcast variable and map

In Spark, we use broadcast variable to make each machine have read only copy of a variable. We usually create a broadcast variable outside closure (Such as a look up table needed by the closure) to improve performance.

We also have a spark transformation operator called mapPartitions, which tried to achieve the same thing (Use shared variable to improve performance). For example, in mapPartitions we can shared a database connection for each partitions.

So what's the difference between these two? Can we use it interchangebly just for shared variables?

like image 727
xuanyue Avatar asked Dec 28 '15 21:12

xuanyue


People also ask

What is the difference between map and mapPartitions in spark?

mapPartitions() – This is precisely the same as map(); the difference being, Spark mapPartitions() provides a facility to do heavy initializations (for example, Database connection) once for each partition instead of doing it on every DataFrame row.

What is the difference between broadcast variable and accumulator in spark?

Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

What is difference between mapPartitions and Foreachpartition?

The difference is the same as that between map and foreach. Look here for good explanations - Is there a difference between foreach and map?. mapPartitions and foreachPartitions are transformations/operations that apply to each partition of the Dataframe as opposed to each element.

What is the use of accumulator and broadcast variable in Apache spark discuss them with complete code example?

An Accumulator variable has an attribute called value that is similar to what a broadcast variable has. It stores the data and is used to return the accumulator's value, but usable only in a driver program. In this example, an accumulator variable is used by multiple workers and returns an accumulated value.


2 Answers

While the answer provided by KrisP highlights all the important differences I think it is worth noting that mapPartitions is just a low level building block behind higher level transformations not a method to achieve shared state.

Although mapPartitions can be used to make shared-liked state explicit it technically not shared (its lifetime is limited to mapPartitions closure`) and there are other means to achieve it. In particular, variables which are referenced inside closures are shared inside a partition. To illustrate that lets play a little with singletons:

object DummySharedState {
  var i = 0L
  def get(x: Any) =  {
    i += 1L
    i
  }
}

sc.parallelize(1 to 100, 1).map(DummySharedState.get).max
// res3: Long = 100
sc.parallelize(1 to 100, 2).map(DummySharedState.get).max
// res4: Long = 50
sc.parallelize(1 to 100, 50).map(DummySharedState.get).max
// res5: Long = 2

and a similar thing in PySpark:

  • singleton module dummy_shared_state.py:

    i = 0
    def get(x):
        global i
        i += 1
        return i
    
  • main script:

    from pyspark import SparkConf, SparkContext
    import dummy_shared_state
    
    master = "spark://..."
    conf = (SparkConf()
        .setMaster(master)
        .set("spark.python.worker.reuse", "false"))
    
    sc.addPyFile("dummy_shared_state.py")
    sc.parallelize(range(100), 1).map(dummy_shared_state.get).max()
    ## 100
    sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
    ## 50 
    

Please note that spark.python.worker.reuse option is set to false. If you keep default value you'll actually see something like this:

sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 50
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 100
sc.parallelize(range(100), 2).map(dummy_shared_state.get).max()
## 150

At the end of the day you have to distinguish between three different things:

  • broadcast variables which are designed to reduce network traffic an memory footprint by keeping a copy of the variable on the worker instead of shipping it with each task
  • variables defined outside closure and referenced inside closure which has to be shipped with each task and are shared for this task
  • variables defined inside closure which are not shared

On top of that there are some Python specific gotchas related to the usage of persistent interpreters.

Still there is no practical difference between map (filter or other transformation) and mapPartitions when it comes to variable lifetime.

like image 24
zero323 Avatar answered Oct 02 '22 00:10

zero323


broadcast is used to ship the object to every worker node. This object is going to be shared among all partitions on that node (and the value/i.e. object is the same for every node in the cluster). The goal of broadcasting is to save on network costs when you use the same data in many different tasks/partitions on the worker node.

mapPartitions in contrast, is a method available on RDDs, and works like map, only on partitions. Yes, you can define new objects, such as a jdbc connection, which will then be unique to each partition. However, you can't share it among different partitions, and much less among different nodes.

like image 96
KrisP Avatar answered Oct 02 '22 00:10

KrisP