I have a scenerio like a below :
Task 1
and Task 2
only when new data is avialable for them in source table ( Athena). Trigger for Task1 and Task2 should happen when a new data parition in a day.Task 3
only on the completion of Task 1
and Task 2
Task 4
only the completion of Task 3
My code
from airflow import DAG
from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task1_partition_exists',
database_name='DB',
table_name='Table1',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
task_id='athena_wait_for_Task2_partition_exists',
database_name='DB',
table_name='Table2',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
execute_Task1 = PostgresOperator(
task_id='Task1',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task1.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task2 = PostgresOperator(
task_id='Task2',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task2.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task3 = PostgresOperator(
task_id='Task3',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task3.sql",
params={'limit': '50'},
trigger_rule='all_success',
dag=dag
)
execute_Task4 = PostgresOperator(
task_id='Task4',
postgres_conn_id='REDSHIFT_CONN',
sql="/sql/flow/Task4",
params={'limit': '50'},
dag=dag
)
execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)
execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)
execute_Task4.set_upstream(execute_Task3)
What is best optimal way of achieving it?
I believe your question addresses two major problems:
schedule_interval
in an explicit way so @daily is setting up something you're not expecting.the short answer: set explicitly your schedule_interval with a cron job format and use sensor operators to check from time to time
default_args={
'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
....
poke_time= 60*5 #<---- set a poke_time in seconds
dag=dag)
where startime
is what time your daily task will start, endtime
what is the last time of the day you should check if an event was done before flagging as failed and poke_time
is the interval your sensor_operator
will check if the event happened.
How to address the cron job explicitly
whenever you are setting your dag to @daily
like you did:
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
from the docs, you can see you are actualy be doing:
@daily - Run once a day at midnight
Which now makes sense why you're getting timeout error, and fails after 5 minutes because you set 'retries': 1
and 'retry_delay': timedelta(minutes=5)
. So it tries running the dag at midnight, it fails. retries again 5 minutes after and fail again, so it flag as failed.
So basically @daily run is setting an implicit cron job of:
@daily -> Run once a day at midnight -> 0 0 * * *
The cron job format is of the format below and you set the value to *
whenever you want to say "all".
Minute Hour Day_of_Month Month Day_of_Week
So @daily is basicly saying run this every: minute 0 hour 0 of all days_of_month of all months of all days_of_week
So your case is run this every: minute 0 hour 10 of all days_of_month of all_months of all days_of_week. This translate in cron job format to:
0 10 * * *
How to trigger and retry properly the execution of the dag when you depend on an external event to complete the execution
you could trigger a dag in airflow from an external event by using the command airflow trigger_dag
. this would be possible if some how you could trigger a lambda function/ python script to target your airflow instance.
If you can't trigger the dag externally, then use a sensor operator like OP did, set a poke_time to it and set a reasonable high number of retries.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With