Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Any example of Airflow FileSensor?

Tags:

python

airflow

Whoever can please point me to an example of how to use Airflow FileSensor? I've googled and haven't found anything yet. Any example would be sufficient. My use case is quite simple:

Wait for a scheduled DAG to drop a file in a path, FileSensor task picks it up, read content and process it.

like image 201
DevEx Avatar asked Feb 20 '19 16:02

DevEx


People also ask

What are Airflow sensors?

Apache Airflow sensors are a special kind of operator that are designed to wait for something to happen. When sensors run, they check to see if a certain condition is met before they are marked successful and let their downstream tasks execute.

What is soft fail in Airflow?

Airflow Documentation. Soft fail – Defines what happens if the sensor fails. If set to False, it allows the sensor to retry and if set to True it allows DAG to mark the task as skipped on failure. If you want to keep the sensor to retry just make sure it is set to False or default ;).

What is external Task sensor Airflow?

Airflow provides feature called external sensor which checks on the state of the task instance which is in a different DAG and if the state is success then the dag with the external sensors simply goes ahead and executes the task(s) which come next.


Video Answer


2 Answers

From the documentation & source code:

from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator    import DummyOperator

import datetime
import airflow

# https://airflow.apache.org/code.html#airflow.models.BaseOperator
default_args = {
    "depends_on_past" : False,
    "start_date"      : airflow.utils.dates.days_ago( 1 ),
    "retries"         : 1,
    "retry_delay"     : datetime.timedelta( hours= 5 ),
}

with airflow.DAG( "file_sensor_test_v1", default_args= default_args, schedule_interval= "*/5 * * * *", ) as dag:

    start_task  = DummyOperator(  task_id= "start" )
    stop_task   = DummyOperator(  task_id= "stop"  )
    sensor_task = FileSensor( task_id= "my_file_sensor_task", poke_interval= 30, fs_conn_id= <path>, filepath= <file or directory name> )

start_task >> sensor_task >> stop_task
like image 144
Meghdeep Ray Avatar answered Oct 22 '22 15:10

Meghdeep Ray


A simple example of a FileSensor task:

second_task = FileSensor(
                 task_id="file_sensor_task_id",
                 filepath="{{ task_instance.xcom_pull(task_ids='get_filepath_task') }}",
                 #fs_conn_id="fs_default" # default one, commented because not needed
                 poke_interval= 20,
                 dag=dag
              )

Here I'm passing as filepath the returned value of the previous PythonOperator task_id (named get_filepath_task) using xcom_pull. But it can be a whatever string of a filepath or directory that you are checking the existence.

The fs_conn_id parameter is the string name of a connection you have available in the UI Admin/Connections section.

The default value of fs_conn_id is "fs_default" (you can see it in the code of the FileSensor class operator). Check the UI Admin/Connections and you will find it.

You can skip to pass fs_conn_id and just pass the parameter filepath if you want to check if a file or a directory exists locally.

The poke_interval is inherited from BaseSensorOperator and it indicates the time in seconds that the job should wait in between each tries. The default value is 60 seconds.

like image 7
enri Avatar answered Oct 22 '22 13:10

enri