Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how does pyspark broadcast variables work

I know it utilizes pickle and ship things across nodes and keep in memory and so on. what i'm confused is why the syntax on using it in pyspark work.

def main():
    sc = SparkContext()
    someValue = rand()
    V = sc.broadcast(someValue)
    A = sc.parallelize().map(worker)

def worker(element):
    element *= V.value

why the above code doesn't get a "V" not defined complaint? I searched the broadcast-related source code in pyspark but didn't get any clue.

like image 731
dennis.s Avatar asked Nov 26 '14 09:11

dennis.s


People also ask

How are broadcast variables defined in PySpark?

Broadcast variables are used to save the copy of data across all nodes. This variable is cached on all the machines and not sent on machines with tasks. The following code block has the details of a Broadcast class for PySpark. class pyspark.Broadcast ( sc = None, value = None, pickle_registry = None, path = None )

How does PySpark broadcast join work?

It is a join operation of a large data frame with a smaller data frame in PySpark Join model. It reduces the data shuffling by broadcasting the smaller data frame in the nodes of PySpark cluster. The data is sent and broadcasted to all nodes in the cluster.

What exactly happens in case of broadcasting in Spark?

Broadcasting in Spark Broadcasting works in Spark by broadcasting the data from executors to the drivers and then have the drivers broadcast it back to the executors. So in other words driver does it collect to get all the data and then it broadcasts the data back to the executors.

What is true about broadcast variables in Spark?

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.


1 Answers

I believe your problem is simply a Python scope issue. If you try the following non-Spark Python code, it'll similarly error with "'V' is not defined":

def runner(func):
    func()

def main():
    V = 22
    A = runner(worker)

def worker():
    print V

if __name__ == '__main__':
    main()

One fix is you can move worker() to be inside of main() (or alternatively, make V a global variable):

def main():
    sc = SparkContext()
    someValue = rand()
    V = sc.broadcast(someValue)
    def worker(element):
        element *= V.value
    A = sc.parallelize().map(worker)
like image 184
Dolan Antenucci Avatar answered Oct 11 '22 23:10

Dolan Antenucci