Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark broadcast variables from local functions

I'm attempting to create broadcast variables from within Python methods (trying to abstract some utility methods I'm creating that rely on distributed operations). However, I can't seem to access the broadcast variables from within the Spark workers.

Let's say I have this setup:

def main():
    sc = SparkContext()
    SomeMethod(sc)

def SomeMethod(sc):
    someValue = rand()
    V = sc.broadcast(someValue)
    A = sc.parallelize().map(worker)

def worker(element):
    element *= V.value  ### NameError: global name 'V' is not defined ###

However, if I instead eliminate the SomeMethod() middleman, it works fine.

def main():
    sc = SparkContext()
    someValue = rand()
    V = sc.broadcast(someValue)
    A = sc.parallelize().map(worker)

def worker(element):
    element *= V.value   # works just fine

I'd rather not have to put all my Spark logic in the main method, if I can. Is there any way to broadcast variables from within local functions and have them be globally visible to the Spark workers?

Alternatively, what would be a good design pattern for this kind of situation--e.g., I want to write a method specifically for Spark which is self-contained and performs a specific function I'd like to re-use?

like image 846
Magsol Avatar asked Nov 16 '14 16:11

Magsol


People also ask

How do you create a broadcast variable in PySpark?

The PySpark Broadcast variable is created using the "broadcast(v)" method of SparkContext class. This method takes argument "v" that is to be broadcasted.

How do you pass a broadcast variable to UDF in PySpark?

Once the broadcast variable is created, the same can be referred within UDF or even directly by the transformations. Even broadcast variables directly can be used as part of join. It is not required to pass broadcast variable as parameter in UDF. Instead it can be directly referred.

How do you broadcast variables?

Class Broadcast<T> A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.

What is the difference between broadcast variable and accumulator in Spark?

Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.


1 Answers

I am not sure I completely understood the question but, if you need the V object inside the worker function you then you definitely should pass it as a parameter, otherwise the method is not really self-contained:

def worker(V, element):
    element *= V.value

Now in order to use it in map functions you need to use a partial, so that map only sees a 1 parameter function:

from functools import partial

def SomeMethod(sc):
    someValue = rand()
    V = sc.broadcast(someValue)
    A = sc.parallelize().map(partial(worker, V=V))
like image 75
elyase Avatar answered Oct 26 '22 04:10

elyase