I have an upload folder that gets irregular uploads. For each uploaded file, I want to spawn a DAG that is specific to that file.
My first thought was to do this with a FileSensor that monitors the upload folder and, conditional on presence of new files, triggers a task that creates the separate DAGs. Conceptually:
Sensor_DAG (FileSensor -> CreateDAGTask)
|-> File1_DAG (Task1 -> Task2 -> ...)
|-> File2_DAG (Task1 -> Task2 -> ...)
In my initial implementation, CreateDAGTask
was a PythonOperator
that created DAG globals, by placing them in the global namespace (see this SO answer), like so:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime, timedelta
from pathlib import Path
UPLOAD_LOCATION = "/opt/files/uploaded"
# Dynamic DAG generation task code, for the Sensor_DAG below
def generate_dags_for_files(location=UPLOAD_LOCATION, **kwargs):
dags = []
for filepath in Path(location).glob('*'):
dag_name = f"process_{filepath.name}"
dag = DAG(dag_name, schedule_interval="@once", default_args={
"depends_on_past": True,
"start_date": datetime(2020, 7, 15),
"retries": 1,
"retry_delay": timedelta(hours=12)
}, catchup=False)
dag_task = DummyOperator(dag=dag, task_id=f"start_{dag_name}")
dags.append(dag)
# Try to place the DAG into globals(), which doesn't work
globals()[dag_name] = dag
return dags
The main DAG then invokes this logic via a PythonOperator
:
# File-sensing DAG
default_args = {
"depends_on_past" : False,
"start_date" : datetime(2020, 7, 16),
"retries" : 1,
"retry_delay" : timedelta(hours=5),
}
with DAG("Sensor_DAG", default_args=default_args,
schedule_interval= "50 * * * *", catchup=False, ) as sensor_dag:
start_task = DummyOperator(task_id="start")
stop_task = DummyOperator(task_id="stop")
sensor_task = FileSensor(task_id="my_file_sensor_task",
poke_interval=60,
filepath=UPLOAD_LOCATION)
process_creator_task = PythonOperator(
task_id="process_creator",
python_callable=generate_dags_for_files,
)
start_task >> sensor_task >> process_creator_task >> stop_task
But that doesn't work, because by the time process_creator_task
runs, the globals have already been parsed by Airflow. New globals after parse time are irrelevant.
Per Airflow dynamic DAG and task Ids, I can achieve what I'm trying to do by omitting the Update: Nevermind -- while this does create a DAG in the dashboard, actual execution runs into the "DAG seems to be missing" issue:FileSensor
task altogether and just letting Airflow generate the per-file task at each scheduler heartbeat, replacing the Sensor_DAG with just executing generate_dags_for_files
:
generate_dags_for_files()
This does mean that I can no longer regulate the frequency of folder polling with the poke_interval
parameter of FileSensor
; instead, Airflow will poll the folder every time it collects DAGs.
Is that the best pattern here?
In short: if the task writes where the DagBag
reads from, yes, but it's best to avoid a pattern that requires this. Any DAG you're tempted to custom-create in a task should probably instead be a static, heavily parametrized, conditionally-triggered DAG. y2k-shubham provides an excellent example of such a setup, and I'm grateful for his guidance in the comments on this question.
That said, here are the approaches that would accomplish what the question is asking, no matter how bad of an idea it is, in the increasing degree of ham-handedness:
dags/
folder.To retain access to the task after it runs, you'd have to keep the new DAG definition stable and accessible on future dashboard updates / DagBag
collection. Otherwise, the Airflow dashboard won't be able to render much about it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With