Whoever can please point me to an example of how to use Airflow FileSensor? I've googled and haven't found anything yet. Any example would be sufficient. My use case is quite simple:
Wait for a scheduled DAG to drop a file in a path, FileSensor task picks it up, read content and process it.
Apache Airflow sensors are a special kind of operator that are designed to wait for something to happen. When sensors run, they check to see if a certain condition is met before they are marked successful and let their downstream tasks execute.
Airflow Documentation. Soft fail – Defines what happens if the sensor fails. If set to False, it allows the sensor to retry and if set to True it allows DAG to mark the task as skipped on failure. If you want to keep the sensor to retry just make sure it is set to False or default ;).
Airflow provides feature called external sensor which checks on the state of the task instance which is in a different DAG and if the state is success then the dag with the external sensors simply goes ahead and executes the task(s) which come next.
From the documentation & source code:
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dummy_operator import DummyOperator
import datetime
import airflow
# https://airflow.apache.org/code.html#airflow.models.BaseOperator
default_args = {
"depends_on_past" : False,
"start_date" : airflow.utils.dates.days_ago( 1 ),
"retries" : 1,
"retry_delay" : datetime.timedelta( hours= 5 ),
}
with airflow.DAG( "file_sensor_test_v1", default_args= default_args, schedule_interval= "*/5 * * * *", ) as dag:
start_task = DummyOperator( task_id= "start" )
stop_task = DummyOperator( task_id= "stop" )
sensor_task = FileSensor( task_id= "my_file_sensor_task", poke_interval= 30, fs_conn_id= <path>, filepath= <file or directory name> )
start_task >> sensor_task >> stop_task
A simple example of a FileSensor
task:
second_task = FileSensor(
task_id="file_sensor_task_id",
filepath="{{ task_instance.xcom_pull(task_ids='get_filepath_task') }}",
#fs_conn_id="fs_default" # default one, commented because not needed
poke_interval= 20,
dag=dag
)
Here I'm passing as filepath
the returned value of the previous PythonOperator
task_id
(named get_filepath_task
) using xcom_pull
.
But it can be a whatever string of a filepath or directory that you are checking the existence.
The fs_conn_id
parameter is the string name of a connection you have available in the UI Admin/Connections section.
The default value of fs_conn_id
is "fs_default"
(you can see it in the code of the FileSensor
class operator). Check the UI Admin/Connections and you will find it.
You can skip to pass fs_conn_id
and just pass the parameter filepath
if you want to check if a file or a directory exists locally.
The poke_interval
is inherited from BaseSensorOperator
and it indicates the time in seconds that the job should wait in between each tries. The default value is 60 seconds.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With