I need to have several identical (differing only in arguments) top-level DAG
s that can also be triggered together with following constraints / assumptions:
schedule_interval=None
as they will only need occasional manual triggering
So I created one file for each DAG in my dags
directory and now I must wire them up for sequential execution. I have identified two ways this could be done:
SubDagOperator
dag_id
must be prefixed by it's parent's, that would force absurd IDs on top-level DAGs that are supposed to be functional independently tooTriggerDagRunOperator
ExternalTaskSensor
might help overcome above limitation but it would make things very messyMy questions are
parent_id
prefix in dag_id
of SubDag
s?TriggerDagRunOperator
s to await completion of DAG?I'm using puckel/docker-airflow with
Airflow 1.9.0-4
Python 3.6-slim
CeleryExecutor
with redis:3.2.7
EDIT-1
Clarifying @Viraj Parekh's queries
Can you give some more detail on what you mean by awaiting completion of the DAG before getting triggered?
When I trigger the import_parent_v1
DAG, all the 3 external DAGs that it is supposed to fire using TriggerDagRunOperator
start running parallely even when I chain them sequentially. Actually the logs indicate that while they are fired one-after another, the execution moves onto next DAG (TriggerDagRunOperator
) before the previous one has finished.
NOTE: In this example, the top-level DAGs are named as importer_child_v1_db_X
and their corresponding task_id
s (for TriggerDagRunOperator
) are named as importer_v1_db_X
Would it be possible to just have the TriggerDagRunOperator be the last task in a DAG?
I have to chain several similar (differing only in arguments) DAGs together in a workflow that triggers them one-by-one. So there isn't just one TriggerDagRunOperator
that I could put at last, there are many (here 3, but would be upto 15 in production)
Triggering a DAG can be accomplished from any other DAG so long as you have the other DAG that you want to trigger's task ID. This can be achieved through the DAG run operator TriggerDagRunOperator .
Taking hints from @Viraj Parekh's answer, I was able to make TriggerDagRunOperator
work in the intended fashion. I'm hereby posting my (partial) answer; will update as and when things become clear.
How to overcome limitation of
parent_id
prefix indag_id
ofSubDag
s?
As told @Viraj, there's no straight way of achieving this. Extending SubDagOperator
to remove this check might work but I decided to steer clear of it
How to force
TriggerDagRunOperator
s to await completion of DAG?
Looking at the implementation, it becomes clear that the job of TriggerDagRunOperator
is just to trigger external DAG; and that's about it. By default, it is not supposed to wait for completion of DAG. Therefore the behaviour I'm observing is understandable.
ExternalTaskSensor
is the obvious way out. However while learning basics of Airflow
I was relying on manual triggering of DAGs (schedule_interval=None
). In such case, ExternalTaskSensor
makes it difficult to accurately specify execution_date
for the external task (who's completion is being awaited), failing which the sensor gets stuck.
So taking hint from implementation, I made minor adjustment to behaviour of ExternalTaskSensor
by awaiting completion of all task_instance
s of concerned task having
execution_date[external_task] >= execution_date[TriggerDagRunOperator] + execution_delta
This achieves the desired result: external DAGs run one-after-other in sequence.
Is there a workaround for my approach of creating separate files (for DAGs that differ only in input) for each top-level DAG?
Again going by @Viraj this can be done by assigning DAGs to global scope using globals()[dag_id] = DAG(..)
EDIT-1
Maybe I was referring to incorrect resource (the link above is already dead), but ExternalTaskSensor
already includes the params execution_delta
& execution_date_fn
to easily restrict execution_date
(s) for the task being sensed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With