Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Broken DAG: No module named 'airflow.contrib.gsc_to_gcs'

Very new to Airflow/Python etc. but can't seem to work out what I need to do to resolve this issue..

Airflow is running on Puckel/Docker

Full error is:

Broken DAG : [/usr/local/airflow/dags/xxxx.py] No module named 'airflow.contrib.operators.gsc_to_gcs'

In the python code, I've written:

from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator

I'm guessing I need to install the gcs_to_gcs module, but I'm not sure how to do this.

Any specific instructions would be greatly appreciated :-)

like image 555
Matt Laz Avatar asked May 23 '18 01:05

Matt Laz


2 Answers

The GoogleCloudStorageToGoogleCloudStorageOperator wasn't available in v1.9.0 so you will have to copy the file from here and the related hook from here and paste it in the Airflow folder in your python environment at the respective location. Follow the steps below:

Run the following code to find where Apache Airflow is stored on your machine:

pip show apache-airflow

which should produce the following output on your terminal:

Name: apache-airflow
Version: 2.0.0.dev0+incubating
Summary: Programmatically author, schedule and monitor data pipelines
Home-page: http://airflow.incubator.apache.org/
Author: Apache Software Foundation
Author-email: [email protected]
License: Apache License 2.0
Location: /Users/kaxil/anaconda2/lib/python2.7/site-packages
Requires: iso8601, bleach, gunicorn, sqlalchemy-utc, markdown, flask-caching, alembic, croniter, flask-wtf, requests, tabulate, psutil, jinja2, gitpython, python-nvd3, sqlalchemy, dill, flask, pandas, pendulum, flask-login, funcsigs, flask-swagger, flask-admin, lxml, python-dateutil, pygments, werkzeug, tzlocal, python-daemon, setproctitle, zope.deprecation, flask-appbuilder, future, configparser, thrift
Required-by:

The path after Location: is your Apache Airflow directory

Now clone the git repo to get those two files:

# Clone the git repo to `airflow-temp` folder
git clone https://github.com/apache/incubator-airflow airflow-temp

# Copy the hook from the cloned repo to where Apache Airflow is located
# Replace LINK_TO_SITE_PACKAGES_DIR with the path you found above
cp airflow-temp/airflow/contrib/hooks/gcs_hook.py LINK_TO_SITE_PACKAGES_DIR/airflow/contrib/hooks/

# For example: for me, it would be 
cp airflow-temp/airflow/contrib/hooks/gcs_hook.py /Users/kaxil/anaconda2/lib/python2.7/site-packages/airflow/contrib/hooks/

# Do the same with operator file
cp airflow-temp/airflow/contrib/operators/gcs_to_gcs.py LINK_TO_SITE_PACKAGES_DIR/airflow/contrib/operators/

# For example: for me, it would be 
cp airflow-temp/airflow/contrib/operators/gcs_to_gcs.py /Users/kaxil/anaconda2/lib/python2.7/site-packages/airflow/contrib/operators/

Re-run the Airflow webserver and scheduler and this should now work.

like image 97
kaxil Avatar answered Oct 21 '22 00:10

kaxil


I know this is an old question but I just tried to use this same operator and received the same message since Cloud-Composer is still not supporting GoogleCloudStorageToGoogleCloudStorageOperator.

I managed to achieve what I needed with a workaround using a simple BashOperator

    from airflow.operators.bash_operator import BashOperator

with models.DAG(
            dag_name,
            schedule_interval=timedelta(days=1),
            default_args=default_dag_args) as dag:

        copy_files = BashOperator(
            task_id='copy_files',
            bash_command='gsutil -m cp <Source Bucket> <Destination Bucket>'
        )

Is very straightforward, can create folders if you need and rename your files.

like image 30
Ary Jazz Avatar answered Oct 20 '22 23:10

Ary Jazz