Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to set priority across different DAGs in Airflow

Let's say we have two DAGs, dag1 and dag2, they serve different business requirements. they are completely unrelated. but dag1 is more important to have it finished as early as possible.
For simplicty, they both have only one task and they run daily.

In a scenario, where dag1 is behind the schedule with 2 or 3 days, I want to make sure that dag1 runs and completes its dag_runs first, i.e dag1 is up to date following that dag2 is able to proceed.

I tried priority_weight but it doesn't work across different dags.

I need a way of putting those tasks from both different dags at the same queue and achieving DAG-level prioritization.

like image 955
Fadi Bakoura Avatar asked Jun 02 '19 01:06

Fadi Bakoura


People also ask

How many DAGs can Airflow handle?

By default, this is set to 32. max_active_tasks_per_dag (formerly dag_concurrency ): This determines the maximum number of tasks that can be scheduled at once, per DAG.

How do I create a dynamic DAG in Airflow?

Dynamic DAGs with globals() You can dynamically generate DAGs by working with globals() . As long as a DAG object in globals() is created, Airflow will load it.


1 Answers

From the official documentation for the External Task Sensor:

Waits for a different DAG or a task in a different DAG to complete for
a specific execution_date.

    :param external_dag_id: The dag_id that contains the task you want to
        wait for
    :type external_dag_id: str
    :param external_task_id: The task_id that contains the task you want to
        wait for. If ``None`` the sensor waits for the DAG
    :type external_task_id: str
    :param allowed_states: list of allowed states, default is ``['success']``
    :type allowed_states: list
    :param execution_delta: time difference with the previous execution to
        look at, the default is the same execution_date as the current task or DAG.
        For yesterday, use [positive!] datetime.timedelta(days=1). Either
        execution_delta or execution_date_fn can be passed to
        ExternalTaskSensor, but not both.
    :type execution_delta: datetime.timedelta
    :param execution_date_fn: function that receives the current execution date
        and returns the desired execution dates to query. Either execution_delta
        or execution_date_fn can be passed to ExternalTaskSensor, but not both.
    :type execution_date_fn: callable
    :param check_existence: Set to `True` to check if the external task exists (when
        external_task_id is not None) or check if the DAG to wait for exists (when
        external_task_id is None), and immediately cease waiting if the external task
        or DAG does not exist (default value: False).
    :type check_existence: bool

Both DAGs should have depends_on_past Trigger Rule set to True so that the newer scheduled DAG runs will only execute if the previous scheduled runs have completed successfully.

Then add the External Task Sensor at the beginning of Dag 2 ( the one which executes later ).

Alternatively you could create your own custom sensor and use it via Airflow Plugins in order to check the metadatabase for the status of Dag Runs.

You could also build customer sensors that utilise either Airflow XCOMs or Airflow Variables to pass execution run times or any other Airflow Macro to a Sensor in DAG 2.

like image 50
Meghdeep Ray Avatar answered Sep 29 '22 22:09

Meghdeep Ray