Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow - Splitting DAG definition across multiple files

Tags:

airflow

Just getting started with Airflow and wondering what best practices are for structuring large DAGs. For our ETL, we have a lots of tasks that fall into logical groupings, yet the groups are dependent on each other. Which of the following would be considered best practice?

  • One large DAG file with all tasks in that file
  • Splitting the DAG definition across multiple files (How to do this?)
  • Define multiple DAGs, one for each group of tasks, and set dependencies between them using ExternalTaskSensor

Also open to other suggestions.

like image 766
R.M. Avatar asked Jan 05 '17 19:01

R.M.


People also ask

How do I create a dynamic DAG in Airflow?

One method for dynamically generating DAGs is to have a single Python file which generates DAGs based on some input parameter(s) (e.g. a list of APIs or tables). A common use case for this is an ETL or ELT-type pipeline where there are many data sources or destinations.

Can two DAGs be created using single Python file?

DAGs are defined in standard Python files that are placed in Airflow's DAG_FOLDER . Airflow will execute the code in each file to dynamically build the DAG objects. You can have as many DAGs as you want, each describing an arbitrary number of tasks. In general, each one should correspond to a single logical workflow.


2 Answers

DAGs are just python files. So you could split a single dag definition into multiple files. The different files should just have methods that take in a dag object and create tasks using that dag object.

Note though, you should just a single dag object in the global scope. Airflow picks up all dag objects in the global scope as separate dags.

It is often considered good practice to keep each dag as concise as possible. However if you need to set up such dependencies you could either consider using subdags. More about this here: https://airflow.incubator.apache.org/concepts.html?highlight=subdag#scope

You could also use ExternalTaskSensor but beware that as the number of dags grow, it might get harder to handle external dependencies between tasks. I think subdags might be the way to go for your use case.

like image 178
Vineet Goel Avatar answered Dec 12 '22 22:12

Vineet Goel


With the advent of TaskGroups in Airflow 2.x, it's worth expanding on a previous answer. TaskGroups are just UI groupings for tasks, but they also serve as handy logical groupings for a bunch of related tasks. The tasks in a TaskGroup can be bundled and abstracted away to make it easier to build a DAG out of larger pieces. That being said, it may still be useful to have a file full of related tasks without bundling them into a TaskGroup.

The trick to breaking up DAGs is to have the DAG in one file, for example my_dag.py, and the logical chunks of tasks or TaskGroups in separate files, with one logical task chunk or TaskGroup per file. Each file contains functions (or methods if you want to take an OO approach) each of which returns an operator instance or a TaskGroup instance.

To illustrate, my_dag.py (below) imports operator-returning functions from foo_bar_tasks.py, and it imports a TaskGroup-returning function from xyzzy_taskgroup.py. Within the DAG context, those functions are called and their return values are assigned to task or TaskGroup variables, which can be assigned up-/downstream dependencies.

dags/my_dag.py:

# std lib imports
 
from airflow import DAG
# other airflow imports
 
from includes.foo_bar_tasks import build_foo_task, build_bar_task
from includes.xyzzy_taskgroup import build_xyzzy_taskgroup
 
with DAG(dag_id="my_dag", ...) as dag:
 
    # logical chunk of tasks
    foo_task = build_foo_task(dag=dag, ...)
    bar_task = build_bar_task(dag=dag, ...)
 
    # taskgroup
    xyzzy_taskgroup = build_xyzzy_taskgroup(dag=dag, ...)
 
    foo_task >> bar_task >> xyzzy_taskgroup

plugins/includes/foo_bar_tasks.py:

# std lib imports
 
from airflow import DAG
from airflow.operators.foo import FooOperator
from airflow.operators.bar import BarOperator
# other airflow imports
 
def build_foo_task(dag: DAG, ...) -> FooOperator:
    # ... logic here ...
    foo_task = FooOperator(..., dag=dag)
 
    return foo_task
 
def build_bar_task(dag: DAG, ...) -> BarOperator:
    # ... logic here ...
    bar_task = BarOperator(..., dag=dag)
 
    return bar_task

plugins/includes/xyzzy_taskgroup.py:

# std lib imports
 
from airflow import DAG
from airflow.operators.baz import BazOperator
from airflow.operators.qux import QuxOperator
from airflow.utils import TaskGroup
# other airflow imports
 
def build_xyzzy_taskgroup(dag: DAG, ...) -> TaskGroup:
    xyzzy_taskgroup = TaskGroup(group_id="xyzzy_taskgroup")
 
    # ... logic here ...
    baz_task = BazOperator(task_id="baz_task", task_group=xyzzy_taskgroup, ...)
 
    # ... logic here ...
    qux_task = QuxOperator(task_id="qux_task", task_group=xyzzy_taskgroup, ...)
 
    baz_task >> qux_task
 
    return xyzzy_taskgroup
like image 27
L. D. Nicolas May Avatar answered Dec 12 '22 20:12

L. D. Nicolas May