I have a row-wise operation I wish to perform on my dataframe which takes in some fixed variables as parameters. The only way I know how to do this is with the use of nested functions. I'm trying to use Cython to compile a portion of my code, then call the Cython function from within mapPartitions, but it raised the error PicklingError: Can't pickle <cyfunction outer_function.<locals>._nested_function at 0xfffffff>.
When using pure Python, I do
def outer_function(fixed_var_1, fixed_var_2):
def _nested_function(partition):
for row in partition:
yield dosomething(row, fixed_var_1, fixed_var_2)
return _nested_function
output_df = input_df.repartition(some_col).rdd \
.mapPartitions(outer_function(a, b))
Right now I have outer_function defined in a separate file, like this
# outer_func.pyx
def outer_function(fixed_var_1, fixed_var_2):
def _nested_function(partition):
for row in partition:
yield dosomething(row, fixed_var_1, fixed_var_2)
return _nested_function
and this
# runner.py
from outer_func import outer_function
output_df = input_df.repartition(some_col).rdd \
.mapPartitions(outer_function(a, b))
And this throws the pickling error above.
I've looked at https://docs.databricks.com/user-guide/faq/cython.html and tried to get outer_function. Still, the same error occurs. The problem is that the nested function does not appear in the global space of the module, thus it cannot be found and serialized.
I've also tried doing this
def outer_function(fixed_var_1, fixed_var_2):
global _nested_function
def _nested_function(partition):
for row in partition:
yield dosomething(row, fixed_var_1, fixed_var_2)
return _nested_function
This throws a different error AttributeError: 'module' object has no attribute '_nested_function'.
Is there any way of not using nested function in this case? Or is there another way I can make the nested function "serializable"?
Thanks!
EDIT: I also tried doing
# outer_func.pyx
class PartitionFuncs:
def __init__(self, fixed_var_1, fixed_var_2):
self.fixed_var_1 = fixed_var_1
self.fixed_var_2 = fixed_var_2
def nested_func(self, partition):
for row in partition:
yield dosomething(row, self.fixed_var_1, self.fixed_var_2)
# main.py
from outer_func import PartitionFuncs
p_funcs = PartitionFuncs(a, b)
output_df = input_df.repartition(some_col).rdd \
.mapPartitions(p_funcs.nested_func)
And still I get PicklingError: Can't pickle <cyfunction PartitionFuncs.nested_func at 0xfffffff>. Oh well, the idea didn't work.
This is a sort-of-half answer because when I tried your class PartitionFuncs method p_funcs.nested_func pickled/unpickled fine for me (I didn't try combining it with PySpark though), so whether the solution below is necessary may depend on your Python version/platform etc. Pickle should support bound methods from Python 3.4, however it looks like PySpark forces the pickle protocol to 3, which will stop that working. There might be ways to change this but I don't know them.
Nested functions are known not to be pickleable, so that approach definitely work work. The class approach is the right one.
My suggestion in the comments was to just try pickling the class, not the bound function. For this to work an instance of the class needs to be callable, so you rename your function to __call__
class PartitionFuncs:
def __init__(self, fixed_var_1, fixed_var_2):
self.fixed_var_1 = fixed_var_1
self.fixed_var_2 = fixed_var_2
def __call__(self, partition):
for row in partition:
yield dosomething(row, self.fixed_var_1, self.fixed_var_2)
This does depend on both the fixed_var variables being pickleable by default. If they're not you can write custom saving and loading methods, as described in the pickle documentation.
As you point out in your comment, this does mean you need a separate class for each function you define. Options here involve inheritance, so having a separate PickleableData class, that each of the Func classes can hold a reference to.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With