Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Airflow - trigger/schedule DAG rerun on completion (File Sensor)

Good Morning.

I'm trying to setup a DAG too

  1. Watch/sense for a file to hit a network folder
  2. Process the file
  3. Archive the file

Using the tutorials online and stackoverflow I have been able to come up with the following DAG and Operator that successfully achieves the objectives, however I would like the DAG to be rescheduled or rerun on completion so it starts watching/sensing for another file.

I attempted to set a variable max_active_runs:1 and then a schedule_interval: timedelta(seconds=5) this yes reschedules the DAG but starts queuing task and locks the file.

Any ideas welcome on how I could rerun the DAG after the archive_task?

Thanks

DAG CODE

from airflow import DAG
from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator
from datetime import datetime, timedelta
from airflow.models import Variable

default_args = {
    'owner': 'glsam',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'provide_context': True,
    'retries': 100,
    'retry_delay': timedelta(seconds=30),
    'max_active_runs': 1,
    'schedule_interval': timedelta(seconds=5),
}

dag = DAG('test_sensing_for_a_file', default_args=default_args)

filepath = Variable.get("soucePath_Test")
filepattern = Variable.get("filePattern_Test")
archivepath = Variable.get("archivePath_Test")

sensor_task = OmegaFileSensor(
    task_id='file_sensor_task',
    filepath=filepath,
    filepattern=filepattern,
    poke_interval=3,
    dag=dag)


def process_file(**context):
    file_to_process = context['task_instance'].xcom_pull(
        key='file_name', task_ids='file_sensor_task')
    file = open(filepath + file_to_process, 'w')
    file.write('This is a test\n')
    file.write('of processing the file')
    file.close()


proccess_task = PythonOperator(
    task_id='process_the_file', 
    python_callable=process_file,
    provide_context=True,
    dag=dag
)

archive_task = ArchiveFileOperator(
    task_id='archive_file',
    filepath=filepath,
    archivepath=archivepath,
    dag=dag)

sensor_task >> proccess_task >> archive_task

FILE SENSOR OPERATOR

    import os
    import re

    from datetime import datetime
    from airflow.models import BaseOperator
    from airflow.plugins_manager import AirflowPlugin
    from airflow.utils.decorators import apply_defaults
    from airflow.operators.sensors import BaseSensorOperator


    class ArchiveFileOperator(BaseOperator):
        @apply_defaults
        def __init__(self, filepath, archivepath, *args, **kwargs):
            super(ArchiveFileOperator, self).__init__(*args, **kwargs)
            self.filepath = filepath
            self.archivepath = archivepath

        def execute(self, context):
            file_name = context['task_instance'].xcom_pull(
                'file_sensor_task', key='file_name')
            os.rename(self.filepath + file_name, self.archivepath + file_name)


    class OmegaFileSensor(BaseSensorOperator):
        @apply_defaults
        def __init__(self, filepath, filepattern, *args, **kwargs):
            super(OmegaFileSensor, self).__init__(*args, **kwargs)
            self.filepath = filepath
            self.filepattern = filepattern

        def poke(self, context):
            full_path = self.filepath
            file_pattern = re.compile(self.filepattern)

            directory = os.listdir(full_path)

            for files in directory:
                if re.match(file_pattern, files):
                    context['task_instance'].xcom_push('file_name', files)
                    return True
            return False


    class OmegaPlugin(AirflowPlugin):
        name = "omega_plugin"
        operators = [OmegaFileSensor, ArchiveFileOperator]
like image 671
Glenn Sampson Avatar asked Jun 26 '17 23:06

Glenn Sampson


People also ask

How do you rerun DAG in Airflow?

Clear all tasks​ To rerun multiple DAGs, click Browse > DAG Runs, select the DAGs to rerun, and in the Actions list select Clear the state.

How do you trigger a DAG from another DAG in Airflow?

Trigger Airflow DAGs on a Schedule To continue triggering Airflow DAGs on a schedule, it's first required to specify the “start_date” and the “schedule_interval” parameters. Second, it's required to upload the DAG file to your environment, too.

How schedule interval Works in Airflow?

A key capability of Airflow is that these DAG Runs are atomic, idempotent items, and the scheduler, by default, will examine the lifetime of the DAG (from start to end/now, one interval at a time) and kick off a DAG Run for any interval that has not been run (or has been cleared). This concept is called Catchup.

What is Start_date in Airflow DAG?

Similarly, since the start_date argument for the DAG and its tasks points to the same logical date, it marks the start of the DAG's first data interval, not when tasks in the DAG will start running. In other words, a DAG run will only be scheduled one interval after start_date .


2 Answers

Set schedule_interval=None and use airflow trigger_dag command from BashOperator to launch next execution at the completion of the previous one.

trigger_next = BashOperator(task_id="trigger_next", 
           bash_command="airflow trigger_dag 'your_dag_id'", dag=dag)

sensor_task >> proccess_task >> archive_task >> trigger_next

You can start your first run manually with the same airflow trigger_dag command and then trigger_next task will automatically trigger the next one. We use this in production for many months now and and it runs perfectly.

like image 137
Dmitri Safine Avatar answered Oct 14 '22 05:10

Dmitri Safine


Dmitris method worked perfectly.

I also found in my reading setting schedule_interval=None and then using the TriggerDagRunOperator worked equally as well

trigger = TriggerDagRunOperator(
    task_id='trigger_dag_RBCPV99_rerun',
    trigger_dag_id="RBCPV99_v2",
    dag=dag)

sensor_task >> proccess_task >> archive_task >> trigger
like image 33
Glenn Sampson Avatar answered Oct 14 '22 05:10

Glenn Sampson