Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Run another DAG with TriggerDagRunOperator multiple times

i have a DAG (DAG1) where i copy a bunch of files. I would then like to kick off another DAG (DAG2) for each file that was copied. As the number of files copied will vary per DAG1 run, i would like to essentially loop over the files and call DAG2 with the appropriate parameters.

eg:

with DAG( 'DAG1',
        description="copy files over",
        schedule_interval="* * * * *",
        max_active_runs=1
    ) as dag:


    t_rsync = RsyncOperator( task_id='rsync_data',
        source='/source/',
        target='/destination/' )

    t_trigger_preprocessing = TriggerDagRunOperator( task_id='trigger_preprocessing',
        trigger_daq_id='DAG2',
        python_callable=trigger

    )

    t_rsync >> t_trigger_preprocessing

i was hoping to use the python_callable trigger to pull the relevant xcom data from t_rsync and then trigger DAG2; but its not clear to me how to do this.

i would prefer to put the logic of calling DAG2 here to simplify the contents of DAG2 (and also provide stacking schematics with the max_active_runs)

like image 236
yee379 Avatar asked Nov 03 '17 19:11

yee379


People also ask

How do you trigger one DAG from another DAG?

In order to use the SimpleHttpOperator to trigger another DAG, you need to define the following: endpoint : This should be of the form '/api/v1/dags/<dag-id>/dagRuns' where <dag-id> is the ID of the DAG you want to trigger. data : To trigger a DAG Run using this endpoint, you must provide an execution date.

Can we run multiple DAGs in Airflow?

EDIT: Easy workaround if you absolutely need them in separate DAGs is to create multiple TriggerDagRunOperators and set their dependencies to the same task.

How do I set dependencies between DAGs in Airflow?

This post has shown how to create those dependencies even if you don't control the upstream DAGs: add a new DAG that relies on using the ExternalTaskSensor (one sensor per upstream DAG), encode the dependencies between the DAGs as dependencies between the sensor tasks, run the DAG encoding the dependencies in the same ...

What is depends on past in Airflow?

According to the official Airflow docs, The task instances directly upstream from the task need to be in a success state. Also, if you have set depends_on_past=True, the previous task instance needs to have succeeded (except if it is the first run for that task).


1 Answers

ended up writing my own operator:

class TriggerMultipleDagRunOperator(TriggerDagRunOperator):
    def execute(self, context):
        count = 0
        for dro in self.python_callable(context):
            if dro:
                with create_session() as session:
                    dbag = DagBag(settings.DAGS_FOLDER)
                    trigger_dag = dbag.get_dag(self.trigger_dag_id)
                    dr = trigger_dag.create_dagrun(
                        run_id=dro.run_id,
                        state=State.RUNNING,
                        conf=dro.payload,
                        external_trigger=True)
                    session.add(dr)
                    session.commit() 
                    count = count + 1
            else:
                self.log.info("Criteria not met, moving on")
        if count == 0:
            raise AirflowSkipException('No external dags triggered')

with a python_callable like

def trigger_preprocessing(context):
    for base_filename,_ in found.items():
        exp = context['ti'].xcom_pull( task_ids='parse_config', key='experiment')
        run_id='%s__%s' % (exp['microscope'], datetime.utcnow().replace(microsecond=0).isoformat())
        dro = DagRunOrder(run_id=run_id) 
        d = { 
            'directory': context['ti'].xcom_pull( task_ids='parse_config', key='experiment_directory'),
            'base': base_filename,
            'experiment': exp['name'],
        }
        LOG.info('triggering dag %s with %s' % (run_id,d))
        dro.payload = d
        yield dro
    return

and then tie it all together with:

t_trigger_preprocessing = TriggerMultipleDagRunOperator( task_id='trigger_preprocessing',
    trigger_dag_id='preprocessing',
    python_callable=trigger_preprocessing
)
like image 80
yee379 Avatar answered Sep 29 '22 17:09

yee379