Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I resolve a Pickling Error on class apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum?

A PicklingError is raised when I run my data pipeline remotely: the data pipeline has been written using the Beam SDK for Python and I am running it on top of Google Cloud Dataflow. The pipeline works fine when I run it locally.

The following code generates the PicklingError: this ought to reproduce the problem

import apache_beam as beam
from apache_beam.transforms import pvalue
from apache_beam.io.fileio import _CompressionType
from apache_beam.utils.options import PipelineOptions
from apache_beam.utils.options import GoogleCloudOptions
from apache_beam.utils.options import SetupOptions
from apache_beam.utils.options import StandardOptions

if __name__ == "__main__":
  pipeline_options = PipelineOptions()
  pipeline_options.view_as(StandardOptions).runner = 'BlockingDataflowPipelineRunner'
  pipeline_options.view_as(SetupOptions).save_main_session = True
  google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
  google_cloud_options.project = "project-name"
  google_cloud_options.job_name = "job-name"
  google_cloud_options.staging_location = 'gs://path/to/bucket/staging'
  google_cloud_options.temp_location = 'gs://path/to/bucket/temp'
  p = beam.Pipeline(options=pipeline_options)
  p.run()

Below is a sample from the beginning and the end of the Traceback:

WARNING: Could not acquire lock C:\Users\ghousains\AppData\Roaming\gcloud\credentials.lock in 0 seconds
WARNING: The credentials file (C:\Users\ghousains\AppData\Roaming\gcloud\credentials) is not writable. Opening in read-only mode. Any refreshed credentials will only be valid for this run.
Traceback (most recent call last):
  File "formatter_debug.py", line 133, in <module>
    p.run()
  File "C:\Miniconda3\envs\beam\lib\site-packages\apache_beam\pipeline.py", line 159, in run
    return self.runner.run(self)
    ....
    ....
    ....
  File "C:\Miniconda3\envs\beam\lib\sitepackages\apache_beam\runners\dataflow_runner.py", line 172, in run
    self.dataflow_client.create_job(self.job))    
  StockPickler.save_global(pickler, obj)
  File "C:\Miniconda3\envs\beam\lib\pickle.py", line 754, in save_global (obj, module, name)) 
  pickle.PicklingError: Can't pickle <class 'apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum'>: it's not found as apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum
like image 979
Data Scientist Avatar asked Mar 10 '23 19:03

Data Scientist


2 Answers

I've found that your error gets raised when a Pipeline object is included in the context that gets pickled and sent to the cloud:

pickle.PicklingError: Can't pickle <class 'apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum'>: it's not found as apache_beam.internal.clients.dataflow.dataflow_v1b3_messages.TypeValueValuesEnum

Naturally, you might ask:

  1. What's making the Pipeline object unpickleable when it's sent to the cloud, since normally it's pickleable?
  2. If this were really the problem, then wouldn't I get this error all the time - isn't a Pipeline object normally included in the context sent to the cloud?
  3. If the Pipeline object isn't normally included in the context sent to the cloud, then why is a Pipeline object being included in my case?

(1)

When you call p.run() on a Pipeline with cloud=True, one of the first things that happens is that p.runner.job=apiclient.Job(pipeline.options) is set in apache_beam.runners.dataflow_runner.DataflowPipelineRunner.run.

Without this attribute set, the Pipeline is pickleable. But once this is set, the Pipeline is no longer pickleable, since p.runner.job.proto._Message__tags[17] is a TypeValueValuesEnum, which is defined as a nested class in apache_beam.internal.clients.dataflow.dataflow_v1b3_messages. AFAIK nested classes cannot be pickled (even by dill - see How can I pickle a nested class in python?).

(2)-(3)

Counterintuitively, a Pipeline object is normally not included in the context sent to the cloud. When you call p.run() on a Pipeline with cloud=True, only the following objects are pickled (and note that the pickling happens after p.runner.job gets set):

  1. If save_main_session=True, then all global objects in the module designated __main__ are pickled. (__main__ is the script that you ran from the command line).
  2. Each transform defined in the pipeline is individually pickled

In your case, you encountered #1, which is why your solution worked. I actually encountered #2 where I defined a beam.Map lambda function as a method of a composite PTransform. (When composite transforms are applied, the pipeline gets added as an attribute of the transform...) My solution was to define those lambda functions in the module instead.

A longer-term solution would be for us to fix this in the Apache Beam project. TBD!

like image 154
jwayne Avatar answered May 12 '23 16:05

jwayne


This should be fixed in the google-dataflow 0.4.4 sdk release with https://github.com/apache/incubator-beam/pull/1485

like image 40
Sourabh Avatar answered May 12 '23 16:05

Sourabh