What exactly happens when I define a local variable, such as a huge list of complex objects, and use it inside an UDF in pyspark. Let me use this as an example:
huge_list = [<object_1>, <object_2>, ..., <object_n>]
@udf
def some_function(a, b):
l = []
for obj in huge_list:
l.append(a.operation(obj))
return l
df2 = df.withColumn('foo', some_function(col('a'), col('b')))
Is it broadcasted automatically? Or the nodes communicate with the master to get its data every time? What are the perfomance penalties that I have with this approach? Is there a better one? (Considering that it would be worse to build huge_list
from scratch every time the UDF is applied)
Looking in the code and can see the following happening: once per each udf this function is called which is pickling the callable in this function via CloudPickleSerializer
. It also has the logic to compare the size of pickled callable with the hardcoded threshold of 1Mb. If the size is larger then pickled command is broadcast and an object of type pyspark.broadcast.Broadcast
is pickled instead (its serialized value is obviously very short since the object is pretty much a reference). The place where the pickled callable is read seems to be here. My understanding is that a python process is created by executors from scratch for every new task execution. And for each used udf it will get either pickled command and unpickle it or (for broadcasts) will need to get broadcast's value from JVM and unpickle that.
As far as I understand if pyspark.broadcast.Broadcast
object is created here all executors will keep its value for all future lookups by python worker.py processes this executor will create.
So if you want to answer the question whether certain function will be broadcast or not you can repeat the same actions that pyspark does and see if pickled object is larger than 1Mb, e.g. like this:
from pyspark.serializers import CloudPickleSerializer
ser = CloudPickleSerializer()
x = [i**2 for i in range(10**5)]
v = ser.dumps(lambda : x)
print(len(v)) # 607434 - less than 1Mb, won't be broadcast
Regarding alternative approaches I think the only alternative I see (except creating new object every time the udf'ed function is called which is already explained to be too expensive) would be to create a module which will create the object in question during importing. In that case the object will be created once for each task execution. So this pretty much gives you a choice to either (a) deserialize the object once per task execution via CloudPickleSerializer
if you just allow udf function to capture it or (b) Create the object once per task execution via importing a module. What is faster is a separate question - but I would imagine that answer might depend on the object in question. In each case it seems to be fairly easy to measure.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With