All the examples I've seen for Spark broadcast variables define them in the scope of the functions using them (map()
, join()
, etc.). I would like to use both a map()
function and mapPartitions()
function that reference a broadcast variable, but I would like to modularize them so I can use the same functions for unit testing purposes.
A thought I had was to curry the function so that I pass a reference to the broadcast variable when using either a map
or mapPartitions
call.
I had something like this in mind (pseudo-code):
// firstFile.scala
// ---------------
def mapper(bcast: Broadcast)(row: SomeRow): Int = {
bcast.value(row._1)
}
def mapMyPartition(bcast: Broadcast)(iter: Iterator): Iterator {
val broadcastVariable = bcast.value
for {
i <- iter
} yield broadcastVariable(i)
})
// secondFile.scala
// ----------------
import firstFile.{mapMyPartition, mapper}
val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3))
rdd
.map(mapper(bcastVariable))
.mapPartitions(mapMyPartition(bcastVariable))
Your solution should work fine. In both cases the function passed to map{Partitions}
will contain a reference to the broadcast variable itself when serialized, but not to its value, and only call bcast.value
when calculated on the node.
What needs to be avoided is something like
def mapper(bcast: Broadcast): SomeRow => Int = {
val value = bcast.value
row => value(row._1)
}
You are doing this correctly. You just have to remember to pass the broadcast reference and not the value itself. Using your example the difference might be shown as follows:
a) efficient way:
// the whole Map[Int, Int] is serialized and sent to every worker
val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3))
rdd
.map(mapper(bcastVariable)) // only the reference to the Map[Int, Int] is serialized and sent to every worker
.mapPartitions(mapMyPartition(bcastVariable)) // only the reference to the Map[Int, Int] is serialized and sent to every worker
b) inefficient way:
// the whole Map[Int, Int] is serialized and sent to every worker
val bcastVariable = sc.broadcast(Map(0 -> 1, 1 -> 2, 2 -> 3))
rdd
.map(mapper(bcastVariable.value)) // the whole Map[Int, Int] is serialized and sent to every worker
.mapPartitions(mapMyPartition(bcastVariable.value)) // the whole Map[Int, Int] is serialized and sent to every worker
Of course in the second example mapper
and mapMyPartition
would have slightly different signature.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With