I built and managed to run a satisfactory pipeline locally with beam, and I am ready to send the job to DataFlow.
I planned to just pickle my session with the save_main_session pipeline option, however I run into a recursion error when trying to do so. After a couple of trial and error I managed to narrow it down to the way I define my ptransform_fn, with a decorator.
Please find below a minimal reproducible example
# my_script.py
from typing import Set
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.transforms.ptransform import ptransform_fn
@ptransform_fn
def my_function(pcoll):
return pcoll | beam.Create([1])
if __name__ == "__main__":
options = PipelineOptions()
options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=options) as p:
p | my_function()
The full traceback is quite long but ends with RecursionError: maximum recursion depth exceeded while calling a Python object
(Note that this is the save_main_session=True) option that enables this error, so I can run this python -m my_script using the local runner and will run into the RecursionError)
As the ptransform_fn is actually making my_function behave in an "unpythonic" way (being called without the argument it was defined), it seems like the pickler library has a problem with this.
So my final questions are:
save_main_session is inherently a bit fragile; for anything non-trivial I recommend putting the logic in a named module that can be imported in your main script (and on your workers).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With