Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Triggering a task on composer DAG from file arrive event from Cloud Function

Can I trigger an airflow task from cloud function?

Basically my problem is this. I have some file which arrives in google cloud storage. Multiple files in the same DAG. I need to trigger a transformation job when the file arrives. I was thinking to use cloud function. But there are a lot of dependent jobs in my DAG.

Any help is appreciated

like image 913
utp Avatar asked Apr 08 '26 11:04

utp


1 Answers

You necessarily don't need Cloud Function to sense for the file in GCS, Composer has GCS sensors which can be used to fulfill the purpose.

Suppose you have to monitor files in bucket/folder/file_*.csv then :

from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
    from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor 
    import datetime as dt
    from airflow.models import DAG
    from airflow.operators.dagrun_operator import TriggerDagRunOperator

    lasthour = dt.datetime.now() - dt.timedelta(hours=1)

    args = {
     'owner': 'airflow',
     'start_date': lasthour,
     'depends_on_past': False,
    }
    dag = DAG(
     dag_id='GCS_sensor_dag',
     schedule_interval=None,
     default_args=args
    )
    GCS_File_list = GoogleCloudStorageListOperator(
                        task_id= 'list_Files',
                        bucket= 'bucketname',
                        prefix='folder/file_',
                        delimiter='.csv',
                        google_cloud_storage_conn_id='google_cloud_default',
                        dag = dag
                    )
    file_sensor = GoogleCloudStoragePrefixSensor(
                        task_id='gcs_polling',  
                        bucket='bucketname',
                        prefix='folder/file_',
                        dag=dag
                    )

    trigger = TriggerDagRunOperator(
                        task_id='trigger_dag_{timestamp}_rerun'.format(timestamp=((dt.datetime.now() - dt.datetime.utcfromtimestamp(0)).total_seconds()*1000)),
                        trigger_dag_id="GCS_sensor_dag",
                        dag=dag
                    )

file_sensor >> GCS_File_list >> trigger
like image 105
Priya Agarwal Avatar answered Apr 11 '26 01:04

Priya Agarwal