Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to trigger a Airflow task only when new partition/data in avialable in the AWS athena table using DAG in python?

I have a scenerio like a below :

  1. Trigger a Task 1 and Task 2 only when new data is avialable for them in source table ( Athena). Trigger for Task1 and Task2 should happen when a new data parition in a day.
  2. Trigger Task 3 only on the completion of Task 1 and Task 2
  3. Trigger Task 4 only the completion of Task 3

enter image description here

My code

from airflow import DAG

from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta

from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task1_partition_exists',
    database_name='DB',
    table_name='Table1',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task2_partition_exists',
    database_name='DB',
    table_name='Table2',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

execute_Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task1.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task2 = PostgresOperator(
    task_id='Task2',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task2.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)



execute_Task3 = PostgresOperator(
    task_id='Task3',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task3.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task4 = PostgresOperator(
    task_id='Task4',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task4",
    params={'limit': '50'},
    dag=dag
)



execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)

execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)

execute_Task4.set_upstream(execute_Task3)

What is best optimal way of achieving it?

like image 490
pankaj Avatar asked Nov 07 '22 08:11

pankaj


1 Answers

I believe your question addresses two major problems:

  1. forgetting to configure the schedule_interval in an explicit way so @daily is setting up something you're not expecting.
  2. How to trigger and retry properly the execution of the dag when you depend on an external event to complete the execution

the short answer: set explicitly your schedule_interval with a cron job format and use sensor operators to check from time to time

default_args={
        'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
     ....
    poke_time= 60*5 #<---- set a poke_time in seconds
    dag=dag)

where startime is what time your daily task will start, endtime what is the last time of the day you should check if an event was done before flagging as failed and poke_time is the interval your sensor_operator will check if the event happened.

How to address the cron job explicitly whenever you are setting your dag to @daily like you did:

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

from the docs, you can see you are actualy be doing: @daily - Run once a day at midnight

Which now makes sense why you're getting timeout error, and fails after 5 minutes because you set 'retries': 1 and 'retry_delay': timedelta(minutes=5). So it tries running the dag at midnight, it fails. retries again 5 minutes after and fail again, so it flag as failed.

So basically @daily run is setting an implicit cron job of:

@daily -> Run once a day at midnight -> 0 0 * * *

The cron job format is of the format below and you set the value to * whenever you want to say "all".

Minute Hour Day_of_Month Month Day_of_Week

So @daily is basicly saying run this every: minute 0 hour 0 of all days_of_month of all months of all days_of_week

So your case is run this every: minute 0 hour 10 of all days_of_month of all_months of all days_of_week. This translate in cron job format to:

0 10 * * *

How to trigger and retry properly the execution of the dag when you depend on an external event to complete the execution

  1. you could trigger a dag in airflow from an external event by using the command airflow trigger_dag. this would be possible if some how you could trigger a lambda function/ python script to target your airflow instance.

  2. If you can't trigger the dag externally, then use a sensor operator like OP did, set a poke_time to it and set a reasonable high number of retries.

like image 162
Bernardo stearns reisen Avatar answered Nov 14 '22 18:11

Bernardo stearns reisen