I'm trying to invoke a Dataflow flex template from a Python Cloud Function using the following code:
from googleapiclient.discovery import build
dataflow = build('dataflow', 'v1b3')
request = dataflow.projects().templates().launch(
projectId=projectid,
gcsPath=template,
body={
'jobName': job_name,
'parameters': parameters,
}
)
print(f"Start to execute Dataflow Job with name: {job_name}")
return request.execute()
This same code works if referencing a classic template, but when referencing a flex template, it errors, with the following details:
Traceback (most recent call last): File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker_v2.py", line 449, in run_background_function _function_handler.invoke_user_function(event_object) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker_v2.py", line 268, in invoke_user_function return call_user_function(request_or_event) File "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker_v2.py", line 265, in call_user_function event_context.Context(**request_or_event.context)) File "/user_code/main.py", line 41, in trigger_cleaning response = call_dataflow(project_id, job_name, template, parameters) File "/user_code/utils/gcp_utils.py", line 58, in call_dataflow 'parameters': parameters, File "/env/local/lib/python3.7/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrapper return wrapped(*args, **kwargs) File "/env/local/lib/python3.7/site-packages/googleapiclient/http.py", line 915, in execute raise HttpError(resp, content, uri=self.uri) googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataflow.googleapis.com/v1b3/projects/concise-flame-279117/templates:launch?gcsPath=gs%3A%2F%2Filan-artefacts%2Ftemplates%2Ftemplate666.json&alt=json returned "(145ec02dfb186de2): There is no support for job type with environment version . Please try upgrading the SDK to the latest version. You can find the instructions on installing the latest SDK at https://cloud.google.com/dataflow/docs/guides/installing-beam-sdk. If that doesn't work, please contact the Cloud Dataflow team for assistance at https://cloud.google.com/dataflow/support.
That "There is no support for job type with environment version . Please try upgrading the SDK to the latest version. You can find the instructions on installing the latest SDK at https://cloud.google.com/dataflow/docs/guides/installing-beam-sdk. If that doesn't work, please contact the Cloud Dataflow team for assistance at https://cloud.google.com/dataflow/support" error doesn't make any sense since this is running within a Cloud function...
Any ideas? Thanks in advance!
UPDATE: SOLVED
The following code solve the issue:
dataflow = build('dataflow', 'v1b3')
request = dataflow.projects().locations().flexTemplates().launch(
projectId=projectid,
location=location,
body={
'launchParameter': {
'jobName': job_name,
'containerSpecGcsPath': template,
'parameters': parameters
}
}
)
print(f"Start to execute Dataflow Job with name: {job_name}")
return request.execute()
Hmm, looks like there is a separate endpoint/call for running flex templates: docs. This worked for me:
dataflow = build('dataflow', 'v1b3', cache_discovery=False)
request = dataflow.projects().locations().flexTemplates().launch(
projectId='my-project',
location='us-central1',
body={
'launch_parameter': {
'jobName': 'testing-1234',
'parameters': {
'some_parameter': 'hello',
},
'containerSpecGcsPath': 'gs://my-storage-bucket/flex_templates/do_something.json'
}
}
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With