Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Trigger a DAG on the success of a another DAG in Airflow using Python?

I have a python DAG Parent Job and DAG Child Job. The tasks in the Child Job should be triggered on the successful completion of the Parent Job tasks which are run daily. How can add external job trigger ?

MY CODE

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')

execute_notebook = PostgresOperator(
  task_id='data_sql',
  postgres_conn_id='REDSHIFT_CONN',
  sql="SELECT * FROM athena_rs.shipments limit 5",
  dag=dag
)
like image 962
aeapen Avatar asked Apr 30 '20 02:04

aeapen


2 Answers

Answer is in this thread already. Below is demo code:

Parent dag:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 4, 29),
}

dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily')

leave_work = DummyOperator(
    task_id='leave_work',
    dag=dag,
)
cook_dinner = DummyOperator(
    task_id='cook_dinner',
    dag=dag,
)

leave_work >> cook_dinner

Child dag:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 4, 29),
}

dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily')

# Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task
# when cook_dinner is finished, Child_dag will be triggered
wait_for_dinner = ExternalTaskSensor(
    task_id='wait_for_dinner',
    external_dag_id='Parent_dag',
    external_task_id='cook_dinner',
    start_date=datetime(2020, 4, 29),
    execution_delta=timedelta(hours=1),
    timeout=3600,
)

have_dinner = DummyOperator(
    task_id='have_dinner',
    dag=dag,
)
play_with_food = DummyOperator(
    task_id='play_with_food',
    dag=dag,
)

wait_for_dinner >> have_dinner
wait_for_dinner >> play_with_food

Images:

Dags

Dags

Parent_dag

Parent_dag

Child_dag

Child_dag

like image 193
moon Avatar answered Sep 18 '22 22:09

moon


As requested by @pankaj, I'm hereby adding a snippet depicting reactive-triggering using TriggerDagRunOperator (as opposed to poll-based triggering of ExternalTaskSensor)

from typing import List

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule

# DAG object
my_dag: DAG = DAG(dag_id='my_dag',
                  start_date=..)
..
# a list of 'tail' tasks: tasks that have no downstream tasks
tail_tasks_of_first_dag: List[BaseOperator] = my_magic_function_that_determines_all_tail_tasks(..)
..

# our trigger task
my_trigger_task: TriggerDagRunOperator = TriggerDagRunOperator(dag=my_dag,
                                                               task_id='my_trigger_task',
                                                               trigger_rule=TriggerRule.ALL_SUCCESS,
                                                               external_dag_id='id_of_dag_to_be_triggered')
# our trigger task should run when all 'tail' tasks have completed / succeeded
tail_tasks_of_first_dag >> my_trigger_task

Note that snippet is for reference purpose only; it has NOT been tested


Points to note / References

  • Get all Airflow Leaf Nodes/Tasks
  • Wiring top-level DAGs together
  • What is the difference between airflow trigger rule “all_done” and “all_success”?
like image 25
y2k-shubham Avatar answered Sep 16 '22 22:09

y2k-shubham