I'm working with virtualenv. I i'm trying to use packages inside DAG folders. Current state of airflow_home
directory is:
airflow_home/airflow.cfg
airflow_home/airflow.db
airflow_home/dags/__init__.py
airflow_home/dags/hello_world.py
airflow_home/dags/support/inner.py
airflow_home/dags/support/__init__.py
hello_world.py has code:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from dags.support import inner
def print_hello():
return 'Hello world'
dag = DAG('hello_world', description='simple tutorial DAG',
schedule_interval='0 12 * * *', start_date=datetime(2017, 8, 20), catchup=False)
dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)
hello_from_inner_operator = PythonOperator(task_id='hello_from_inner', python_callable=inner.hello_from_inner, dag=dag)
dummy_operator >> hello_operator
hello_operator >> hello_from_inner_operator
If i could manually run this script, it runs. But then i start airflow scheduler,
Broken DAG: No module named 'dags'
error appears. What i'm doing wrong, what is the way to solve this ?
You can do it in one of those ways: add your modules to one of the folders that Airflow automatically adds to PYTHONPATH. add extra folders where you keep your code to PYTHONPATH. package your code into a Python package and install it together with Airflow.
To create a DAG in Airflow, you always have to import the DAG class. After the DAG class, come the imports of Operators. Basically, for each Operator you want to use, you have to make the corresponding import. For example, you want to execute a Python function, you have to import the PythonOperator.
In Airflow, a DAG is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. The default DAGs directory is located at /opt/bitnami/airflow/dags.
use from support import inner
instead.
the path $AIRFLOW_HOME/dags
will be added into sys.path
when airflow starts. So it will search modules under dags
directory.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With