Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark and local variables inside UDFs

What exactly happens when I define a local variable, such as a huge list of complex objects, and use it inside an UDF in pyspark. Let me use this as an example:

huge_list = [<object_1>, <object_2>, ..., <object_n>]

@udf
def some_function(a, b):
    l = []
    for obj in huge_list:
        l.append(a.operation(obj))
    return l

df2 = df.withColumn('foo', some_function(col('a'), col('b')))

Is it broadcasted automatically? Or the nodes communicate with the master to get its data every time? What are the perfomance penalties that I have with this approach? Is there a better one? (Considering that it would be worse to build huge_list from scratch every time the UDF is applied)

like image 382
holypriest Avatar asked Nov 11 '18 16:11

holypriest


1 Answers

Looking in the code and can see the following happening: once per each udf this function is called which is pickling the callable in this function via CloudPickleSerializer. It also has the logic to compare the size of pickled callable with the hardcoded threshold of 1Mb. If the size is larger then pickled command is broadcast and an object of type pyspark.broadcast.Broadcast is pickled instead (its serialized value is obviously very short since the object is pretty much a reference). The place where the pickled callable is read seems to be here. My understanding is that a python process is created by executors from scratch for every new task execution. And for each used udf it will get either pickled command and unpickle it or (for broadcasts) will need to get broadcast's value from JVM and unpickle that.

As far as I understand if pyspark.broadcast.Broadcast object is created here all executors will keep its value for all future lookups by python worker.py processes this executor will create.

So if you want to answer the question whether certain function will be broadcast or not you can repeat the same actions that pyspark does and see if pickled object is larger than 1Mb, e.g. like this:

from pyspark.serializers import CloudPickleSerializer
ser = CloudPickleSerializer()
x = [i**2 for i in range(10**5)]
v = ser.dumps(lambda : x)
print(len(v)) # 607434 - less than 1Mb, won't be broadcast

Regarding alternative approaches I think the only alternative I see (except creating new object every time the udf'ed function is called which is already explained to be too expensive) would be to create a module which will create the object in question during importing. In that case the object will be created once for each task execution. So this pretty much gives you a choice to either (a) deserialize the object once per task execution via CloudPickleSerializer if you just allow udf function to capture it or (b) Create the object once per task execution via importing a module. What is faster is a separate question - but I would imagine that answer might depend on the object in question. In each case it seems to be fairly easy to measure.

like image 197
Alexander Pivovarov Avatar answered Oct 20 '22 18:10

Alexander Pivovarov