Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can an Airflow task dynamically generate a DAG at runtime?

I have an upload folder that gets irregular uploads. For each uploaded file, I want to spawn a DAG that is specific to that file.

My first thought was to do this with a FileSensor that monitors the upload folder and, conditional on presence of new files, triggers a task that creates the separate DAGs. Conceptually:

Sensor_DAG (FileSensor -> CreateDAGTask)

|-> File1_DAG (Task1 -> Task2 -> ...)
|-> File2_DAG (Task1 -> Task2 -> ...)

In my initial implementation, CreateDAGTask was a PythonOperator that created DAG globals, by placing them in the global namespace (see this SO answer), like so:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime, timedelta
from pathlib import Path

UPLOAD_LOCATION = "/opt/files/uploaded"

# Dynamic DAG generation task code, for the Sensor_DAG below
def generate_dags_for_files(location=UPLOAD_LOCATION, **kwargs):
    dags = []
    for filepath in Path(location).glob('*'):
        dag_name = f"process_{filepath.name}"
        dag = DAG(dag_name, schedule_interval="@once", default_args={
            "depends_on_past": True,
            "start_date": datetime(2020, 7, 15),
            "retries": 1,
            "retry_delay": timedelta(hours=12)
        }, catchup=False)
        dag_task = DummyOperator(dag=dag, task_id=f"start_{dag_name}")

        dags.append(dag)

        # Try to place the DAG into globals(), which doesn't work
        globals()[dag_name] = dag

    return dags

The main DAG then invokes this logic via a PythonOperator:

# File-sensing DAG
default_args = {
    "depends_on_past" : False,
    "start_date"      : datetime(2020, 7, 16),
    "retries"         : 1,
    "retry_delay"     : timedelta(hours=5),
}
with DAG("Sensor_DAG", default_args=default_args,
         schedule_interval= "50 * * * *", catchup=False, ) as sensor_dag:

    start_task  = DummyOperator(task_id="start")
    stop_task   = DummyOperator(task_id="stop")
    sensor_task = FileSensor(task_id="my_file_sensor_task",
                             poke_interval=60,
                             filepath=UPLOAD_LOCATION)
    process_creator_task = PythonOperator(
        task_id="process_creator",
        python_callable=generate_dags_for_files,
    )
    start_task >> sensor_task >> process_creator_task >> stop_task

But that doesn't work, because by the time process_creator_task runs, the globals have already been parsed by Airflow. New globals after parse time are irrelevant.

Interim solution

Per Airflow dynamic DAG and task Ids, I can achieve what I'm trying to do by omitting the FileSensor task altogether and just letting Airflow generate the per-file task at each scheduler heartbeat, replacing the Sensor_DAG with just executing generate_dags_for_files: Update: Nevermind -- while this does create a DAG in the dashboard, actual execution runs into the "DAG seems to be missing" issue:

generate_dags_for_files()

This does mean that I can no longer regulate the frequency of folder polling with the poke_interval parameter of FileSensor; instead, Airflow will poll the folder every time it collects DAGs.

Is that the best pattern here?

Other related StackOverflow threads

  • Run Airflow DAG for each file and Airflow: Proper way to run DAG for each file: identical use case, but the accepted answer uses two static DAGs, presumably with different parameters.
  • Proper way to create dynamic workflows in Airflow - accepted answer dynamically creates tasks, not DAGs, via a complicated XCom setup.
like image 567
Simon Podhajsky Avatar asked Jul 17 '20 22:07

Simon Podhajsky


Video Answer


1 Answers

In short: if the task writes where the DagBag reads from, yes, but it's best to avoid a pattern that requires this. Any DAG you're tempted to custom-create in a task should probably instead be a static, heavily parametrized, conditionally-triggered DAG. y2k-shubham provides an excellent example of such a setup, and I'm grateful for his guidance in the comments on this question.

That said, here are the approaches that would accomplish what the question is asking, no matter how bad of an idea it is, in the increasing degree of ham-handedness:

  • If you dynamically generate DAGs from a Variable (like so), modify the Variable.
  • If you dynamically generate DAGs from a list of config files, add a new config file to wherever you're pulling config files from, so that a new DAG gets generated on the next DAG collection.
  • Use something like Jinja templating to write a new Python file in the dags/ folder.

To retain access to the task after it runs, you'd have to keep the new DAG definition stable and accessible on future dashboard updates / DagBag collection. Otherwise, the Airflow dashboard won't be able to render much about it.

like image 192
Simon Podhajsky Avatar answered Dec 05 '22 01:12

Simon Podhajsky