Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to set up multiple Dag directories in airflow

Tags:

python

airflow

I have different airflow dags set up for different python projects i.e. one parent dags folder /vol/dags with subfolders for DAGs based on different python projects: /vol/dags/project1/project1.py, /vol/dags/project2/project2.py where DAGS_FOLDER = /vol/dags.

project1.py for example imports a function from another python file in the same directory i.e./vol/dags/project1/mycalculator.py. But when I started airflow webserver, I get an ImportError:

/vol/dags/project1/$ airflow webserver -p 8080

INFO - Filling up the DagBag from /vol/dags/
ERROR - Failed to import: /vol/dags/project1/project1.py
Traceback (most recent call last):
  File "/Users/xxx/anaconda/lib/python2.7/site-packages/airflow/models.py", line 247, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/vol/dags/project1/project1.py", line 10, in <module>
    from mycalculator import *
ImportError: No module named mycalculator

I tried to import mycalculator.py to project1.py like this:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from mycalculator import *

dag = DAG(
    dag_id='project1', default_args=args,
    schedule_interval="@once")
like image 810
DevEx Avatar asked Apr 04 '17 22:04

DevEx


People also ask

How many DAGs can I have in Airflow?

DAGs are defined in standard Python files that are placed in Airflow's DAG_FOLDER . Airflow will execute the code in each file to dynamically build the DAG objects. You can have as many DAGs as you want, each describing an arbitrary number of tasks.

Where are Airflow DAG files stored?

The default location for your DAGs is ~/airflow/dags .

How do you trigger another DAG in Airflow?

In order to use the SimpleHttpOperator to trigger another DAG, you need to define the following: endpoint : This should be of the form '/api/v1/dags/<dag-id>/dagRuns' where <dag-id> is the ID of the DAG you want to trigger. data : To trigger a DAG Run using this endpoint, you must provide an execution date.

How to create a DAG in airflow?

The first step is to import the classes you need. To create a DAG in Airflow, you always have to import the DAG class. After the DAG class, come the imports of Operators. Basically, for each Operator you want to use, you have to make the corresponding import.

What is a Dagrun in airflow?

A DAGRun is an instance of your DAG with an execution date in Airflow. Ok, once you know what is a DAG, the next question is, what is a “Node” in the context of Airflow? What is Airflow Operator? In an Airflow DAG, nodes are operators. In other words, a task in your DAG is an operator.

How does the airflow scheduler monitor DAGs?

“ The Airflow scheduler monitors all tasks and DAGs. Behind the scenes, it spins up a subprocess, which monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) collects DAG parsing results and inspects active tasks to see whether they can be triggered. ” [ Airflow Scheduler]

What is the difference between dagfactory and airflow?

DAGFactory: DAGFactory will collect and initialize DAGs from the individual projects under a specific folder, in my case it was @ airflow/projects. Airflow will initialize all the dags under airflow/dags dir. So we will install DAGFactory here.


Video Answer


2 Answers

You can use packaged dag concept to have different dag folders for different projects. You will only need to place zip of each project in your parent dag folder.

This way you can combine dags with its dependencies easily and your dag folder will be neat and clean as it will only contain zip of each project.

You can create a zip that looks like this:

my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py

And your parent dag folder can look something like this:

project1.zip
project2.zip
my_dag3.py
like image 94
Ayush Chauhan Avatar answered Sep 17 '22 12:09

Ayush Chauhan


Same problem here.

Indeed, our imports work because in the Airflow context, the DAG_FOLDER has been added to the PYTHONPATH. To add init.py in project1/ doesn't change anything.

A good solution could be use relative imports, as

from .mycalculator import *

But relative imports cannot work right now because of how Airflow imports Dags (explained to me by airflow developer)

So for me, the simpliest solution was to keep the dags files at the root, by prefixing them by 'project1_' or 'project2_', and put the libs like mycalculator in subfolders.

like image 30
Nicolas Dufaur Avatar answered Sep 16 '22 12:09

Nicolas Dufaur