Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow - Python file NOT in the same DAG folder

Tags:

I am trying to use Airflow to execute a simple task python.

from __future__ import print_function from airflow.operators.python_operator import PythonOperator from airflow.models import DAG from datetime import datetime, timedelta   from pprint import pprint  seven_days_ago = datetime.combine(datetime.today() - timedelta(7),                                   datetime.min.time())  args = {     'owner': 'airflow',     'start_date': seven_days_ago, }  dag = DAG(dag_id='python_test', default_args=args)   def print_context(ds, **kwargs):     pprint(kwargs)     print(ds)     return 'Whatever you return gets printed in the logs'  run_this = PythonOperator(     task_id='print',     provide_context=True,     python_callable=print_context,     dag=dag) 

If i try, for example:

airflow test python_test print 2015-01-01

It works!

Now i want to put my def print_context(ds, **kwargs) function in other python file. So i create antoher file called: simple_test.py and change:

run_this = PythonOperator(     task_id='print',     provide_context=True,     python_callable=simple_test.print_context,     dag=dag) 

Now I try to run again:

airflow test python_test print 2015-01-01

And OK! It still work!

But if i create a module, for example, worker module with file SimplePython.py, import (from worker import SimplePython)it and try:

airflow test python_test print 2015-01-01

It gives the message :

ImportError: No module named worker

The questions:

  1. Is it possible to import a module inside a DAG definition?
  2. How Airflow+Celery is going to distribute all necessary python sources files across the worker nodes?
like image 432
p.magalhaes Avatar asked Nov 03 '15 22:11

p.magalhaes


People also ask

Where are Airflow DAG files stored?

cfg . The default location for your DAGs is ~/airflow/dags .

Why is my DAG not showing in Airflow?

Create a subdirectory called dags in your main project directory and move your DAG there. Then refresh the Airflow UI and you should be able to see it. Note that the AIRFLOW_HOME should be set to be your main project directory.

How do I import a Python library into Airflow?

You can do it in one of those ways: add your modules to one of the folders that Airflow automatically adds to PYTHONPATH. add extra folders where you keep your code to PYTHONPATH. package your code into a Python package and install it together with Airflow.


2 Answers

You can package dependencies of your DAG as per:

https://airflow.apache.org/concepts.html#packaged-dags

To allow this you can create a zip file that contains the dag(s) in the root of the zip file and have the extra modules unpacked in directories. For instance you can create a zip file that looks like this:

my_dag1.py my_dag2.py package1/__init__.py package1/functions.py 

Airflow will scan the zip file and try to load my_dag1.py and my_dag2.py. It will not go into subdirectories as these are considered to be potential packages.

When using CeleryExecutor, you need to manually sync DAG directories, Airflow doesn't take care of that for you:

https://airflow.apache.org/configuration.html?highlight=scaling%20out%20celery#scaling-out-with-celery

The worker needs to have access to its DAGS_FOLDER, and you need to synchronize the filesystems by your own means

like image 81
ImDarrenG Avatar answered Sep 21 '22 08:09

ImDarrenG


While packaging your dags into a zip as covered in the docs is the only supported solution I have seen, you can also do imports of modules that are inside the dags folder. This is useful if you sync the dags folder automatically using other tools like puppet & git.

I am not clear on your directory structure from the question, so here is an example dags folder based on a typical python project structure:

└── airflow/dags  # root airflow dags folder where all dags live     └── my_dags  # git repo project root         ├── my_dags  # python src root (usually named same as project)         │   ├── my_test_globals.py  # file I want to import         │   ├── dag_in_package.py          │   └── dags          │        └── dag_in_subpackage.py         ├── README.md  # also setup.py, LICENSE, etc here         └── dag_in_project_root.py 

I have left out the (required [1]) __init__.py files. Note the location of the three example dags. You would almost certainly use only one of these places for all your dags. I include them all here for sake of example because it shouldn't matter for the import. To import my_test_globals from any of them:

from my_dags.my_dags import my_test_globals 

I believe this means that airflow runs with the python path set to the dags directory so each subdirectory of the dags folder can be treated as a python package. In my case it was the additional intermediate project root directory getting in the way of doing a typical intra-package absolute import. Thus, we could restructure this airflow project like this:

└── airflow/dags  # root airflow dags folder where all dags live     └── my_dags  # git repo project root & python src root         ├── my_test_globals.py  # file I want to import         ├── dag_in_package.py          ├── dags          │    └── dag_in_subpackage.py         ├── README.md  # also setup.py, LICENSE, etc here         └── dag_in_project_root.py 

So that imports look as we expect them to:

from my_dags import my_test_globals 
like image 21
7yl4r Avatar answered Sep 19 '22 08:09

7yl4r