Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

airflow TriggerDagRunOperator how to change the execution date

I noticed that for scheduled task the execution date is set in the past according to

Airflow was developed as a solution for ETL needs. In the ETL world, you typically summarize data. So, if I want to summarize data for 2016-02-19, I would do it at 2016-02-20 midnight GMT, which would be right after all data for 2016-02-19 becomes available.

however, when a dag triggers another dag the execution time is set to now().

Is there a way to have the triggered dags with the same execution time of triggering dag? Of course, I can rewrite the template and use yesterday_ds, however, this is a tricky solution.

like image 212
ozw1z5rd Avatar asked Dec 14 '17 08:12

ozw1z5rd


People also ask

What is execution date in Airflow?

So its execution date is also in the day it is triggered, because it is scheduled at minute 50 for each hour. In airflow @hourly corresponds to 0 * * * *. Its schedule also similar. It is triggered at minute 0 for each hour, but in the doc its execution date is given as 2016-01-01.

Is it possible to wait for the DAG triggered by the TriggerDagRunOperator to complete before executing the next task?

If you need to wait for the previous DAG to complete first, consider using ExternalTaskSensor instead of TriggerDagRunOperator. This operator will wait until the another DAG (or another DAG's task) is complete with a specified status (default is "success") until moving forward.

What is TriggerDagRunOperator in Airflow?

TriggerDagRunOperator. The TriggerDagRunOperator is an easy way to implement cross-DAG dependencies. This operator allows you to have a task in one DAG that triggers another DAG in the same Airflow environment. Read more in-depth documentation about this operator on the Astronomer Registry.

What is start date in Airflow DAG?

The start date is the date at which your DAG starts being scheduled. This date can be in the past or in the future. Think of the start date as the start of the data interval you want to process. For example, the 01/01/2021 00:00. In addition to the start date, you need a schedule interval.


3 Answers

The TriggerDagRunOperator now has an execution_date parameter to set the execution date of the triggered run. Unfortunately the parameter is not in the template fields. If it will be added to template fields (or if you override the operator and change the template_fields value) it will be possible to use it like this:

my_trigger_task= TriggerDagRunOperator(task_id='my_trigger_task',
                                              trigger_dag_id="triggered_dag_id",
                                              python_callable=conditionally_trigger,
                                              execution_date= '{{execution_date}}',
                                              dag=dag)

It has not been released yet but you can see the sources here: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/dagrun_operator.py

The commit that did the change was: https://github.com/apache/incubator-airflow/commit/089c996fbd9ecb0014dbefedff232e8699ce6283#diff-41f9029188bd5e500dec9804fed26fb4

like image 64
Ena Avatar answered Oct 21 '22 22:10

Ena


The following class expands on TriggerDagRunOperator to allow passing the execution date as a string that then gets converted back into a datetime. It's a bit hacky but it is the only way I found to get the job done.

from datetime import datetime
import logging

from airflow import settings
from airflow.utils.state import State
from airflow.models import DagBag
from airflow.operators.dagrun_operator import TriggerDagRunOperator, DagRunOrder

class MMTTriggerDagRunOperator(TriggerDagRunOperator):
    """
    MMT-patched for passing explicit execution date
    (otherwise it's hard to hook the datetime.now() date).
    Use when you want to explicity set the execution date on the target DAG
    from the controller DAG.

    Adapted from Paul Elliot's solution on airflow-dev mailing list archives:
    http://mail-archives.apache.org/mod_mbox/airflow-dev/201711.mbox/%3cCAJuWvXgLfipPmMhkbf63puPGfi_ezj8vHYWoSHpBXysXhF_oZQ@mail.gmail.com%3e

    Parameters
    ------------------
    execution_date: str
        the custom execution date (jinja'd)

    Usage Example:
    -------------------
    my_dag_trigger_operator = MMTTriggerDagRunOperator(
        execution_date="{{execution_date}}"
        task_id='my_dag_trigger_operator',
        trigger_dag_id='my_target_dag_id',
        python_callable=lambda: random.getrandbits(1),
        params={},
        dag=my_controller_dag
    )
    """
    template_fields = ('execution_date',)

    def __init__(
        self, trigger_dag_id, python_callable, execution_date,
        *args, **kwargs
        ):
        self.execution_date = execution_date
        super(MMTTriggerDagRunOperator, self).__init__(
            trigger_dag_id=trigger_dag_id, python_callable=python_callable,
           *args, **kwargs
       )

    def execute(self, context):
        run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
        dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
        dro = self.python_callable(context, dro)
        if dro:
            session = settings.Session()
            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                state=State.RUNNING,
                execution_date=self.execution_date,
                conf=dro.payload,
                external_trigger=True)
            logging.info("Creating DagRun {}".format(dr))
            session.add(dr)
            session.commit()
            session.close()
        else:
            logging.info("Criteria not met, moving on")

There is an issue you may run into when using this and not setting execution_date=now(): your operator will throw a mysql error if you try to start a dag with an identical execution_date twice. This is because the execution_date and dag_id are used to create the row index and rows with identical indexes cannot be inserted.

I can't think of a reason you would ever want to run two identical dags with the same execution_date in production anyway, but it is something I ran into while testing and you should not be alarmed by it. Simply clear the old job or use a different datetime.

like image 23
7yl4r Avatar answered Oct 21 '22 20:10

7yl4r


I improved a bit the MMTTriggerDagRunOperator. The function checks if the dag_run already exists, if found, restart the dag using the clear function of airflow. This allows us to create a dependency between dags because the possibility to have the execution date moved to the triggered dag opens a whole universe of amazing possibilities. I wonder why this is not the default behavior in airflow.

   def execute(self, context):
        run_id_dt = datetime.strptime(self.execution_date, '%Y-%m-%d %H:%M:%S')
        dro = DagRunOrder(run_id='trig__' + run_id_dt.isoformat())
        dro = self.python_callable(context, dro)
        if dro:
            session = settings.Session()
            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)

            if not trigger_dag.get_dagrun( self.execution_date ):
                dr = trigger_dag.create_dagrun(
                       run_id=dro.run_id,
                       state=State.RUNNING,
                       execution_date=self.execution_date,
                       conf=dro.payload,
                       external_trigger=True
                )
                logging.info("Creating DagRun {}".format(dr))
                session.add(dr)
                session.commit()
            else:
                trigger_dag.clear( 
                    start_date = self.execution_date,
                    end_date = self.execution_date,
                    only_failed = False,
                    only_running = False,
                    confirm_prompt = False, 
                    reset_dag_runs = True, 
                    include_subdags= False,
                    dry_run = False 
                )
                logging.info("Cleared DagRun {}".format(trigger_dag))

            session.close()
        else:
            logging.info("Criteria not met, moving on")
like image 42
ozw1z5rd Avatar answered Oct 21 '22 21:10

ozw1z5rd