With the help of this Stackoverflow post I just made a program (the one shown in the post) where when a file is placed inside an S3 bucket a task in one of my running DAGs is triggered and then I perform some work using the BashOperator. Once it's done though the DAG is no longer in a running state but instead goes into a success state and if I want to have it pick up another file I need to clear all the 'Past', 'Future', 'Upstream', 'Downstream' activity. I would like to make this program so that it's always running and anytime a new file is placed inside the S3 bucket the program kicks off the tasks.
Can I continue using the S3KeySenor to do this or do I need to figure out a way of setting up an External Trigger to run my DAG? As of now my S3KeySensor is pretty pointless if it's only going to ever run once.
from airflow import DAG from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor from datetime import datetime, timedelta from airflow.operators.bash_operator import BashOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 5, 29), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 5, 'retry_delay': timedelta(minutes=5) } dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once') # This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster. t2 = BashOperator( task_id='create_emr_cluster_1', bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py', retries=1, dag=dag) t1 = BashOperator( task_id='success_log', bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt', dag=dag) sensor = S3KeySensor( task_id='new_s3_file_in_foobar-bucket', bucket_key='*', wildcard_match=True, bucket_name='foobar-bucket', s3_conn_id='s3://foobar-bucket', timeout=18*60*60, poke_interval=120, dag=dag) t1.set_upstream(sensor) t2.set_upstream(t1)
I'm wondering if this is not possible because it then wouldn't be a Directed Acyclic Graph but rather it would have a loop that repeated sensor -> t1 -> t2 -> sensor -> t1 -> t2 -> sensor -> ... keep repeating.
Update:
My use case is pretty simple, anytime a new file is placed inside a designated AWS S3 Bucket I want my DAG to be triggered and start my process of various tasks. The tasks will do things like instantiate a new AWS EMR Cluster, extract the files from the AWS S3 Bucket, perform some AWS EMR Activities, then shut down the AWS EMR Cluster. From there the DAG would go back into a waiting state where it would wait for new files to arrive in the AWS S3 Bucket and then repeat the process indefinitely.
timeout : The maximum amount of time in seconds that the sensor should check the condition for. If the condition has not been met when this time is reached, the task fails.
There are different types of Airflow sensors, with the S3KeySensor being one of them. It checks for the presence of a key or a file in an S3 bucket. The sensor can be configured to be checking for the presence of the key after seconds or minutes. If it finds the key, control is passed to the next task within the DAG.
In sensor mode='reschedule' means that if the criteria of the sensor isn't True then the sensor will release the worker to other tasks. This is very useful for cases when sensor may wait for a long time.
Airflow Documentation. Soft fail – Defines what happens if the sensor fails. If set to False, it allows the sensor to retry and if set to True it allows DAG to mark the task as skipped on failure. If you want to keep the sensor to retry just make sure it is set to False or default ;).
Within Airflow, there isn't a concept that maps to an always running DAG. You could have a DAG run very frequently like every 1 to 5 minutes if that suits your use case.
The main thing here is that the S3KeySensor checks until it detects that the first file exists in the key's wildcard path (or timeout), then it runs. But when a second, or third, or fourth file lands, the S3 sensor will have already completed running for that DAG run. It won't get scheduled to run again until the next DAG run. (The looping idea you described is roughly equivalent to what the scheduler does when it creates DAG runs except not forever.)
An external trigger definitely sounds like the best approach for your use case, whether that trigger comes via the Airflow CLI's trigger_dag command ($ airflow trigger_dag ...
):
https://github.com/apache/incubator-airflow/blob/972086aeba4616843005b25210ba3b2596963d57/airflow/bin/cli.py#L206-L222
Or via the REST API:
https://github.com/apache/incubator-airflow/blob/5de22d7fa0d8bc6b9267ea13579b5ac5f62c8bb5/airflow/www/api/experimental/endpoints.py#L41-L89
Both turn around and call the trigger_dag
function in the common (experimental) API:
https://github.com/apache/incubator-airflow/blob/089c996fbd9ecb0014dbefedff232e8699ce6283/airflow/api/common/experimental/trigger_dag.py#L28-L67
You could, for instance, setup an AWS Lambda function, called when a file lands on S3, that runs the trigger DAG call.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With