I have a scenario wherein a particular dag upon completion needs to trigger multiple dags,have used TriggerDagRunOperator to trigger single dag,is it possible to pass multiple dags to the TriggerDagRunOperator to trigger multiple dags?
And is it possible to trigger only upon successful completion of the current dag.
Airflow will execute the code in each file to dynamically build the DAG objects. You can have as many DAGs as you want, each describing an arbitrary number of tasks.
We can use the BranchPythonOperator to define two code execution paths, choose the first one during regular operation, and the other path in case of an error. In the other branch, we can trigger another DAG using the trigger operator.
If you need to wait for the previous DAG to complete first, consider using ExternalTaskSensor instead of TriggerDagRunOperator. This operator will wait until the another DAG (or another DAG's task) is complete with a specified status (default is "success") until moving forward.
I have faced the same problem. And there is no solution out of the box, but we can write a custom operator for it.
So here the code of a custom operator, that get python_callable
and trigger_dag_id
as arguments:
class TriggerMultiDagRunOperator(TriggerDagRunOperator):
@apply_defaults
def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs):
super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
def execute(self, context):
session = settings.Session()
created = False
for dro in self.python_callable(context, *self.op_args, **self.op_kwargs):
if not dro or not isinstance(dro, DagRunOrder):
break
if dro.run_id is None:
dro.run_id = 'trig__' + datetime.utcnow().isoformat()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True
)
created = True
self.log.info("Creating DagRun %s", dr)
if created is True:
session.commit()
else:
self.log.info("No DagRun created")
session.close()
trigger_dag_id
is dag id what we want running multiple times.
python_callable
is a function, it should return a list of DagRunOrder
objects, one object for schedule one instance of DAG with dag_id trigger_dag_id
.
Code and examples on GitHub: https://github.com/mastak/airflow_multi_dagrun Little bit more description about this code: https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With