i have a DAG (DAG1) where i copy a bunch of files. I would then like to kick off another DAG (DAG2) for each file that was copied. As the number of files copied will vary per DAG1 run, i would like to essentially loop over the files and call DAG2 with the appropriate parameters.
eg:
with DAG( 'DAG1',
description="copy files over",
schedule_interval="* * * * *",
max_active_runs=1
) as dag:
t_rsync = RsyncOperator( task_id='rsync_data',
source='/source/',
target='/destination/' )
t_trigger_preprocessing = TriggerDagRunOperator( task_id='trigger_preprocessing',
trigger_daq_id='DAG2',
python_callable=trigger
)
t_rsync >> t_trigger_preprocessing
i was hoping to use the python_callable trigger
to pull the relevant xcom data from t_rsync
and then trigger DAG2; but its not clear to me how to do this.
i would prefer to put the logic of calling DAG2 here to simplify the contents of DAG2 (and also provide stacking schematics with the max_active_runs
)
In order to use the SimpleHttpOperator to trigger another DAG, you need to define the following: endpoint : This should be of the form '/api/v1/dags/<dag-id>/dagRuns' where <dag-id> is the ID of the DAG you want to trigger. data : To trigger a DAG Run using this endpoint, you must provide an execution date.
EDIT: Easy workaround if you absolutely need them in separate DAGs is to create multiple TriggerDagRunOperators and set their dependencies to the same task.
This post has shown how to create those dependencies even if you don't control the upstream DAGs: add a new DAG that relies on using the ExternalTaskSensor (one sensor per upstream DAG), encode the dependencies between the DAGs as dependencies between the sensor tasks, run the DAG encoding the dependencies in the same ...
According to the official Airflow docs, The task instances directly upstream from the task need to be in a success state. Also, if you have set depends_on_past=True, the previous task instance needs to have succeeded (except if it is the first run for that task).
ended up writing my own operator:
class TriggerMultipleDagRunOperator(TriggerDagRunOperator):
def execute(self, context):
count = 0
for dro in self.python_callable(context):
if dro:
with create_session() as session:
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True)
session.add(dr)
session.commit()
count = count + 1
else:
self.log.info("Criteria not met, moving on")
if count == 0:
raise AirflowSkipException('No external dags triggered')
with a python_callable like
def trigger_preprocessing(context):
for base_filename,_ in found.items():
exp = context['ti'].xcom_pull( task_ids='parse_config', key='experiment')
run_id='%s__%s' % (exp['microscope'], datetime.utcnow().replace(microsecond=0).isoformat())
dro = DagRunOrder(run_id=run_id)
d = {
'directory': context['ti'].xcom_pull( task_ids='parse_config', key='experiment_directory'),
'base': base_filename,
'experiment': exp['name'],
}
LOG.info('triggering dag %s with %s' % (run_id,d))
dro.payload = d
yield dro
return
and then tie it all together with:
t_trigger_preprocessing = TriggerMultipleDagRunOperator( task_id='trigger_preprocessing',
trigger_dag_id='preprocessing',
python_callable=trigger_preprocessing
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With