I've been playing with Spark and Python on this online jupyter notebook https://tmpnb.org/ and tried 3 ways to pass python functions:
1) using map
import numpy as np
def my_sqrt(x):
return np.sqrt(x)
sc.parallelize(range(10)).map(my_sqrt).collect()
2) parallelizing my_sqrt and call it
sc.parallelize([(my_sqrt, i) for i in range(10)]).map(lambda x : x[0](x[1])).collect()
3) parallelizing np.sqrt and call it
sc.parallelize([(np.sqrt, i) for i in range(10)]).map(lambda x : x[0](x[1])).collect()
(1) and (3) do work and (2) doesn't. First I would like to understand why/how (1) and (3) work. Second, I would like to understand why (2) doesn't and what could be done to make it work.
The first approach works because Spark is using special serialization strategy to process closures required for transformations which is significantly slower but more powerful than the standard pickle
(otherwise we couldn't use .map(lambda x: ...)
).
The last approach works because there is no need to serialize function code at all. It references sqrt
from numpy
module so as long as NumPy is accessible on each worker there is no problem at all.
The second approach doesn't work because pickling doesn't serialize the code.
import pickle
pickle.dumps(my_sqrt)
## b'\x80\x03c__main__\nmy_sqrt\nq\x00.'
All it does it states please give me an object assigned to my_sqrt
(my_sqrt.__name__
) from the top-level script environment (a.k.a. __main__
). When it is executed on the workers it doesn't use the same environment and there is no such object in the scope anymore, hence the exception. To be clear it is neither a bug or something specific to Spark. You can easily reproduce the same behavior locally as follows:
In [1]: import pickle
In [2]: def foo(): ...
In [3]: foo_ = pickle.dumps(foo)
In [4]: pickle.loads(foo_)
Out[4]: <function __main__.foo>
In [5]: del foo
In [6]: pickle.loads(foo_)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
...
AttributeError: Can't get attribute 'foo' on <module '__main__'>
Since it doesn't concern itself with an actual value you can even reassign like this:
In [7]: foo = "foo"
In [8]: pickle.loads(foo_)
Out[8]: 'foo'
Take away message here is if you want to use a function this way put it in a separate module and distribute it among the workers the same way as you do with other dependencies, including custom classes definitions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With