Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Trigger dag via file watcher in airflow

Tags:

python

airflow

I came accross this article using the watchdog API and it seemed like exactly what I need: https://medium.com/@phanikumaryadavilli/hacking-apache-airflow-to-trigger-dags-based-on-filesystem-events-25f822fd08c3

(code was not written by me)

import os
import time
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from datetime import datetime, timedelta


dag = DAG(dag_id="test_trigger_dag_operator",default_args={"owner":"Airflow", "start_date":datetime(2020,3,9)})

trigger = TriggerDagRunOperator(
    task_id="test_trigger_dag_run_operator",
    trigger_dag_id="dummy_operator",
    conf={"message": "Hello World"},
    dag=dag,
    )

class Handler(FileSystemEventHandler):
    def on_created(self, event):
        if event.event_type == 'created':
            print("file created")
            print('Executing the dag')
            trigger

def main():
    observer = Observer()
    event_handler = FileSystemEventHandler()
    observer_path = os.getcwd()
    observer.schedule(Handler(), observer_path, recursive=False)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    
    observer.join()

if __name__ == '__main__':
    main()

Unfortunately using the authors code the only thing the dag does is to unconditionally trigger the target dag and main() never gets called, i.e. there is also no file-watching.

I made some slight modifications of the code adding the python_callable attribute in the TriggerDagRunOperator as well as adding the necessary args to main (context, dag_run_obj)

trigger = TriggerDagRunOperator(
    task_id="test_trigger_dag_run_operator",
    trigger_dag_id="dummy_operator",
    conf={"message": "Hello World"},
    python_callable: main,
    dag=dag,
    )

and removing the

if __name__ == '__main__':
    main()

part.

Now the file-watchdog is working, still the target-dag is triggered once in any case and the scheduler hangs as soon as starting the dag. (Which is kinda what one would expect with while (true)) How can I use the provided code in a working manner?

like image 200
Chris Avatar asked Sep 03 '25 04:09

Chris


1 Answers

Airflow has it's own service named DagBag Filling, that parses your dag and put it in the DagBag, a DagBag is the collection of dags you see both on the UI and the metadata DB.

While doing the DagBag filling on your file (parsing any DAG on it) it actually never ends! You are running that watcher inside this DAG file definition itself.

To avoid such scenario you'll need to implement a Sensor

Sensor - an Operator that waits (polls) for a certain time, file, database row, S3 key, etc…

, that will do exactly what you want but prevent your environment to crash. That essentially will be to reimpelement your main function in the overwritten poke() method on your custom sensor.

You can check any existing sensor in the contrib repo, or write your custom sensor based on your needs.

Also if u want to just trigger a dag from your application you can submit a POST request to that dagid through the REST API

Any of both implementations will fix your issue

like image 108
Alejandro Kaspar Avatar answered Sep 04 '25 17:09

Alejandro Kaspar