Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Reference Spark Broadcast Variables Outside of Scope

All the examples I've seen for Spark broadcast variables define them in the scope of the functions using them (map(), join(), etc.). I would like to use both a map() function and mapPartitions() function that reference a broadcast variable, but I would like to modularize them so I can use the same functions for unit testing purposes.

  • How can I accomplish this?

A thought I had was to curry the function so that I pass a reference to the broadcast variable when using either a map or mapPartitions call.

  • Are there any performance implications by passing around the reference to the broadcast variable that are not normally found when defining the functions inside the original scope?

I had something like this in mind (pseudo-code):

// firstFile.scala
// ---------------

def mapper(bcast: Broadcast)(row: SomeRow): Int = {
  bcast.value(row._1)
}

def mapMyPartition(bcast: Broadcast)(iter: Iterator): Iterator {
  val broadcastVariable = bcast.value

  for {
    i <- iter
  } yield broadcastVariable(i)
})


// secondFile.scala
// ----------------

import firstFile.{mapMyPartition, mapper}

val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3))

rdd
 .map(mapper(bcastVariable))
 .mapPartitions(mapMyPartition(bcastVariable))
like image 812
iralls Avatar asked Apr 25 '16 19:04

iralls


2 Answers

Your solution should work fine. In both cases the function passed to map{Partitions} will contain a reference to the broadcast variable itself when serialized, but not to its value, and only call bcast.value when calculated on the node.

What needs to be avoided is something like

def mapper(bcast: Broadcast): SomeRow => Int = {
  val value = bcast.value
  row => value(row._1)
}
like image 187
Alexey Romanov Avatar answered Oct 03 '22 22:10

Alexey Romanov


You are doing this correctly. You just have to remember to pass the broadcast reference and not the value itself. Using your example the difference might be shown as follows:

a) efficient way:

// the whole Map[Int, Int] is serialized and sent to every worker
val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3)) 

rdd
.map(mapper(bcastVariable)) // only the reference to the Map[Int, Int] is serialized and sent to every worker
.mapPartitions(mapMyPartition(bcastVariable)) // only the reference to the Map[Int, Int] is serialized and sent to every worker

b) inefficient way:

// the whole Map[Int, Int] is serialized and sent to every worker
val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3)) 

rdd
.map(mapper(bcastVariable.value)) // the whole Map[Int, Int] is serialized and sent to every worker
.mapPartitions(mapMyPartition(bcastVariable.value)) // the whole Map[Int, Int] is serialized and sent to every worker

Of course in the second example mapper and mapMyPartition would have slightly different signature.

like image 32
Paweł Jurczenko Avatar answered Oct 04 '22 00:10

Paweł Jurczenko