I am using Apache Beam in Python with Google Cloud Dataflow (2.3.0). When specifying the worker_machine_type
parameter as e.g. n1-highmem-2
or custom-1-6656
, Dataflow runs the job but always uses the standard machine type n1-standard-1
for every worker.
Does anyone have an idea if I am doing something wrong?
Other topics (here and here) show that this should be possible, so this might be a version issue.
My code for specifying PipelineOptions (note that all other options do work fine, so it should recognize the worker_machine_type
parameter):
def get_cloud_pipeline_options(project):
options = {
'runner': 'DataflowRunner',
'job_name': ('converter-ml6-{}'.format(
datetime.now().strftime('%Y%m%d%H%M%S'))),
'staging_location': os.path.join(BUCKET, 'staging'),
'temp_location': os.path.join(BUCKET, 'tmp'),
'project': project,
'region': 'europe-west1',
'zone': 'europe-west1-d',
'autoscaling_algorithm': 'THROUGHPUT_BASED',
'save_main_session': True,
'setup_file': './setup.py',
'worker_machine_type': 'custom-1-6656',
'max_num_workers': 3,
}
return beam.pipeline.PipelineOptions(flags=[], **options)
def main(argv=None):
args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_cloud_pipeline_options(args.project_id
pipeline = beam.Pipeline(options=pipeline_options)
PipelineOptions
uses argparse
behind the scenes to parse its argument. In the case of machine type, the name of the argument is machine_type
however the flag name is worker_machine_type
. This works fine in the following two cases, where argparse does its parsing and is aware of this aliasing:
my_pipeline.py --worker_machine_type custom-1-6656
flags['--worker_machine_type', 'worker_machine_type custom-1-6656', ...]
However it does not work well with **kwargs
. Any additional args passed in that way are used to substitute for known argument names (but not flag names).
In short, using machine_type
would work everywhere. I filed https://issues.apache.org/jira/browse/BEAM-4112 for this to be fixed in Beam in the future.
This can be solved by using the flag machine_type
instead of worker_machine_type
. The rest of the code works fine.
The documentation is thus mentioning the wrong field name.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With