Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Event based Triggering and running an airflow task on dropping a file into S3 bucket

Is it possible to run an airflow task only when a specific event occurs like an event of dropping a file into a specific S3 bucket. Something similar to AWS Lambda events

There is S3KeySensor but I don't know if it does what I want (to run Task only when an event occurs)

Here is the example to make the question more clear:

I have a sensor object as follows

sensor = S3KeySensor(
    task_id='run_on_every_file_drop',
    bucket_key='file-to-watch-*',
    wildcard_match=True,
    bucket_name='my-sensor-bucket',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag
)

Using the above sensor object, airflow behavior for the sensor task is as follows:

  • Runs the task if there is already an object name matching the wildcard in the S3 bucket my-sensor-bucket even before the DAG is switched ON in airflow admin UI (I don't want to run the task due to the presence of past s3 objects)
  • After running once, the sensor task will not run again whenever there is a new S3 file object drop(I want to run the sensor task and subsequent tasks in the DAG every single time there is a new S3 file object dropped in the bucket my-sensor-bucket)
  • If you configure the scheduler, the tasks are run based on schedule but not based on event. So scheduler seems like not an option in this case

I'm trying to understand if tasks in airflow can be run only based on scheduling(like cron jobs) or sensors(only once based on sensing criteria) or cant it be setup like an event based pipeline(something similar to AWS Lambda)

like image 909
Kingz Avatar asked Nov 04 '19 13:11

Kingz


People also ask

What is S3 trigger?

Introduction to Cloud Computing on AWS for Beginners [2022] Amazon S3 service is used for file storage, where you can upload or remove files. We can trigger AWS Lambda on S3 when there are any file uploads in S3 buckets. AWS Lambda has a handler function which acts as a start point for AWS Lambda function.

Can S3 bucket have multiple event notifications?

For additional information, see the Configuring S3 Event Notifications section in the Amazon S3 Developer Guide. NOTE: S3 Buckets only support a single notification configuration. Declaring multiple aws.


1 Answers

Airflow is fundamentally organized around time based scheduling.

You can hack around to get what you want though in a few ways:

  1. Say you have an SQS event on S3 it triggers an AWS Lambda that calls the airflow API to trigger a dag run.
  2. You can make a DAG start with the SQS sensor, when it gets the s3 change event, it just proceeds with the rest of the DAG (see 3_1 & 3_2 for rescheduling).
  3. You can make a DAG start with the sensor (like the one you show) it doesn't choose the task to run, it just passes to the next dependent tasks OR times out. You'd have to delete the key that made the sensor match.
    1. You rerun by making the final task re-trigger the DAG.
    2. Or set the schedule interval to every minute, with no catchup, with max active DAG runs set to 1. This way one run will be active, the sensor will hold it until its time out. If it completes or times out, the next run will start within a minute.

If you go with route 3, you'll be deleting the keys that passed the sensor before the next run of the DAG and its sensor. Note that due to S3 eventual consistency, the routes 1 & 2 are more reliable.

like image 58
dlamblin Avatar answered Sep 29 '22 23:09

dlamblin