Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

No module named airfow.gcp - how to run dataflow job that uses python3/beam 2.15?

When I go to use operators/hooks like the BigQueryHook I see a message that these operators are deprecated and to use the airflow.gcp... operator version. However when i try and use it in my dag it fails and says no module named airflow.gcp. I have the most up to date airflow composer version w/ beta features, python3. Is it possible to install these operators somehow?

I am trying to run a Dataflow Job in python 3 using beam 2.15. I have tried virtualenv operator, but that doesn't work because it only allows python2.7. How can I do this?

like image 348
WIT Avatar asked Oct 24 '19 16:10

WIT


1 Answers

The newest Airflow version available in Composer is either 1.10.2 or 1.10.3 (depending on the region). By then, those operators were in the contrib section.

Focusing on how to run Python 3 Dataflow jobs with Composer you'd need for a new version to be released. However, if you need an immediate solution you can try to back-port the fix.

In this case I defined a DataFlow3Hook which extends the normal DataFlowHook but that it does not hard-code python2 in the start_python_dataflow method:

class DataFlow3Hook(DataFlowHook):
    def start_python_dataflow(
        ...
        py_interpreter: str = "python3"
    ):

        ...

        self._start_dataflow(variables, name, [py_interpreter] + py_options + [dataflow],
                             label_formatter)

Then we'll have our custom DataFlowPython3Operator calling the new hook:

class DataFlowPython3Operator(DataFlowPythonOperator):

    def execute(self, context):
        ...
        hook = DataFlow3Hook(gcp_conn_id=self.gcp_conn_id,
                            delegate_to=self.delegate_to,
                            poll_sleep=self.poll_sleep)
        ...
        hook.start_python_dataflow(
            self.job_name, formatted_options,
            self.py_file, self.py_options, py_interpreter="python3")

Finally, in our DAG we just use the new operator:

task = DataFlowPython3Operator(
    py_file='/home/airflow/gcs/data/main.py',
    task_id=JOB_NAME,
    dag=dag)

See full code here. Job runs with Python 3.6:

enter image description here

Environment details and dependencies used (Beam job was a minimal example):

softwareConfig:
  imageVersion: composer-1.8.0-airflow-1.10.3
  pypiPackages:
    apache-beam: ==2.15.0
    google-api-core: ==1.14.3
    google-apitools: ==0.5.28
    google-cloud-core: ==1.0.3
  pythonVersion: '3'

Let me know if that works for you. If so, I'd recommend moving the code to a plugin for code readability and to reuse it across DAGs.

like image 97
Guillem Xercavins Avatar answered Sep 19 '22 04:09

Guillem Xercavins