Let's say we have two DAGs, dag1 and dag2, they serve different business requirements. they are completely unrelated. but dag1 is more important to have it finished as early as possible.
For simplicty, they both have only one task and they run daily.
In a scenario, where dag1 is behind the schedule with 2 or 3 days, I want to make sure that dag1 runs and completes its dag_runs first, i.e dag1 is up to date following that dag2 is able to proceed.
I tried priority_weight but it doesn't work across different dags.
I need a way of putting those tasks from both different dags at the same queue and achieving DAG-level prioritization.
By default, this is set to 32. max_active_tasks_per_dag (formerly dag_concurrency ): This determines the maximum number of tasks that can be scheduled at once, per DAG.
Dynamic DAGs with globals() You can dynamically generate DAGs by working with globals() . As long as a DAG object in globals() is created, Airflow will load it.
From the official documentation for the External Task Sensor:
Waits for a different DAG or a task in a different DAG to complete for
a specific execution_date.
:param external_dag_id: The dag_id that contains the task you want to
wait for
:type external_dag_id: str
:param external_task_id: The task_id that contains the task you want to
wait for. If ``None`` the sensor waits for the DAG
:type external_task_id: str
:param allowed_states: list of allowed states, default is ``['success']``
:type allowed_states: list
:param execution_delta: time difference with the previous execution to
look at, the default is the same execution_date as the current task or DAG.
For yesterday, use [positive!] datetime.timedelta(days=1). Either
execution_delta or execution_date_fn can be passed to
ExternalTaskSensor, but not both.
:type execution_delta: datetime.timedelta
:param execution_date_fn: function that receives the current execution date
and returns the desired execution dates to query. Either execution_delta
or execution_date_fn can be passed to ExternalTaskSensor, but not both.
:type execution_date_fn: callable
:param check_existence: Set to `True` to check if the external task exists (when
external_task_id is not None) or check if the DAG to wait for exists (when
external_task_id is None), and immediately cease waiting if the external task
or DAG does not exist (default value: False).
:type check_existence: bool
Both DAGs should have depends_on_past
Trigger Rule set to True
so that the newer scheduled DAG runs will only execute if the previous scheduled runs have completed successfully.
Then add the External Task Sensor at the beginning of Dag 2 ( the one which executes later ).
Alternatively you could create your own custom sensor and use it via Airflow Plugins in order to check the metadatabase for the status of Dag Runs.
You could also build customer sensors that utilise either Airflow XCOMs or Airflow Variables to pass execution run times or any other Airflow Macro to a Sensor in DAG 2.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With