does anybody have any idea on FileSensor ? I came through it while i was researching on sensing files on my local directory. The code is as follows:
task= FileSensor(
task_id="senseFile"
filepath="etc/hosts",
fs_conn_id='fs_local',
_hook=self.hook,
dag=self.dag,)
I have also set my conn_id and conn type as File (path) and gave the {'path':'mypath'} but even though i set a non existing path or if the file isnt there in the specified path, the task is completed and the dag is successful. The FileSensor doesnt seem to sense files at all.
I found the community contributed FileSenor a little bit underwhelming so wrote my own.
I got it working for files locally to where the server/scheduler was running however ran into problems when using network paths.
The trick for network paths I found was to mount the network drive to my Linux Box.
This is my DAG used to sensor_task >> proccess_task >> archive_task >> trigger rerun
Note: We use variables (sourcePath, filePattern & archivePath) entered via the WebGUI
from airflow import DAG
from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator, TriggerDagRunOperator
from datetime import datetime, timedelta
from airflow.models import Variable
default_args = {
'owner': 'glsam',
'depends_on_past': False,
'start_date': datetime(2017, 6, 26),
'provide_context': True,
'retries': 100,
'retry_delay': timedelta(seconds=30)
}
task_name = 'my_first_file_sensor_task'
filepath = Variable.get("soucePath")
filepattern = Variable.get("filePattern")
archivepath = Variable.get("archivePath")
dag = DAG(
'task_name',
default_args=default_args,
schedule_interval=None,
catchup=False,
max_active_runs=1,
concurrency=1)
sensor_task = OmegaFileSensor(
task_id=task_name,
filepath=filepath,
filepattern=filepattern,
poke_interval=3,
dag=dag)
def process_file(**context):
file_to_process = context['task_instance'].xcom_pull(
key='file_name', task_ids=task_name)
file = open(filepath + file_to_process, 'w')
file.write('This is a test\n')
file.write('of processing the file')
file.close()
proccess_task = PythonOperator(
task_id='process_the_file', python_callable=process_file, dag=dag)
archive_task = ArchiveFileOperator(
task_id='archive_file',
filepath=filepath,
task_name=task_name,
archivepath=archivepath,
dag=dag)
trigger = TriggerDagRunOperator(
task_id='trigger_dag_rerun', trigger_dag_id=task_name, dag=dag)
sensor_task >> proccess_task >> archive_task >> trigger
And then this is my FileSenor
import os
import re
from datetime import datetime
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.operators.sensors import BaseSensorOperator
class ArchiveFileOperator(BaseOperator):
@apply_defaults
def __init__(self, filepath, archivepath, task_name, *args, **kwargs):
super(ArchiveFileOperator, self).__init__(*args, **kwargs)
self.filepath = filepath
self.archivepath = archivepath
self.task_name = task_name
def execute(self, context):
file_name = context['task_instance'].xcom_pull(self.task_name, key='file_name')
os.rename(self.filepath + file_name, self.archivepath + file_name)
class OmegaFileSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath, filepattern, *args, **kwargs):
super(OmegaFileSensor, self).__init__(*args, **kwargs)
self.filepath = filepath
self.filepattern = filepattern
def poke(self, context):
full_path = self.filepath
file_pattern = re.compile(self.filepattern)
directory = os.listdir(full_path)
for files in directory:
if not re.match(file_pattern, files):
# do nothing
else:
context['task_instance'].xcom_push('file_name', files)
return True
return False
class OmegaPlugin(AirflowPlugin):
name = "omega_plugin"
operators = [OmegaFileSensor, ArchiveFileOperator]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With