Can I trigger an airflow task from cloud function?
Basically my problem is this. I have some file which arrives in google cloud storage. Multiple files in the same DAG. I need to trigger a transformation job when the file arrives. I was thinking to use cloud function. But there are a lot of dependent jobs in my DAG.
Any help is appreciated
You necessarily don't need Cloud Function to sense for the file in GCS, Composer has GCS sensors which can be used to fulfill the purpose.
Suppose you have to monitor files in bucket/folder/file_*.csv then :
from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
import datetime as dt
from airflow.models import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
lasthour = dt.datetime.now() - dt.timedelta(hours=1)
args = {
'owner': 'airflow',
'start_date': lasthour,
'depends_on_past': False,
}
dag = DAG(
dag_id='GCS_sensor_dag',
schedule_interval=None,
default_args=args
)
GCS_File_list = GoogleCloudStorageListOperator(
task_id= 'list_Files',
bucket= 'bucketname',
prefix='folder/file_',
delimiter='.csv',
google_cloud_storage_conn_id='google_cloud_default',
dag = dag
)
file_sensor = GoogleCloudStoragePrefixSensor(
task_id='gcs_polling',
bucket='bucketname',
prefix='folder/file_',
dag=dag
)
trigger = TriggerDagRunOperator(
task_id='trigger_dag_{timestamp}_rerun'.format(timestamp=((dt.datetime.now() - dt.datetime.utcfromtimestamp(0)).total_seconds()*1000)),
trigger_dag_id="GCS_sensor_dag",
dag=dag
)
file_sensor >> GCS_File_list >> trigger
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With