I'm attempting to create broadcast variables from within Python methods (trying to abstract some utility methods I'm creating that rely on distributed operations). However, I can't seem to access the broadcast variables from within the Spark workers.
Let's say I have this setup:
def main():
sc = SparkContext()
SomeMethod(sc)
def SomeMethod(sc):
someValue = rand()
V = sc.broadcast(someValue)
A = sc.parallelize().map(worker)
def worker(element):
element *= V.value ### NameError: global name 'V' is not defined ###
However, if I instead eliminate the SomeMethod()
middleman, it works fine.
def main():
sc = SparkContext()
someValue = rand()
V = sc.broadcast(someValue)
A = sc.parallelize().map(worker)
def worker(element):
element *= V.value # works just fine
I'd rather not have to put all my Spark logic in the main method, if I can. Is there any way to broadcast variables from within local functions and have them be globally visible to the Spark workers?
Alternatively, what would be a good design pattern for this kind of situation--e.g., I want to write a method specifically for Spark which is self-contained and performs a specific function I'd like to re-use?
The PySpark Broadcast variable is created using the "broadcast(v)" method of SparkContext class. This method takes argument "v" that is to be broadcasted.
Once the broadcast variable is created, the same can be referred within UDF or even directly by the transformations. Even broadcast variables directly can be used as part of join. It is not required to pass broadcast variable as parameter in UDF. Instead it can be directly referred.
Class Broadcast<T> A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.
Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.
I am not sure I completely understood the question but, if you need the V
object inside the worker function you then you definitely should pass it as a parameter, otherwise the method is not really self-contained:
def worker(V, element):
element *= V.value
Now in order to use it in map functions you need to use a partial, so that map only sees a 1 parameter function:
from functools import partial
def SomeMethod(sc):
someValue = rand()
V = sc.broadcast(someValue)
A = sc.parallelize().map(partial(worker, V=V))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With