Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to get latest execution time of a dag run in airflow

Tags:

airflow

I tried below code but still i am getting issue

from airflow.models DagModel

def get_latest_execution_date(**kwargs):

session = airflow.settings.Session()

f = open("/home/Insurance/InsuranceDagsTimestamp.txt","w+")

try:
    Insurance_last_dag_run = session.query(DagModel)
    for Insdgrun in Insurance_last_dag_run:
        if Insdgrun is None: 
            f.write(Insdgrun.dag_id+",9999-12-31"+"\n")
        else:
            f.write(Insdgrun.dag_id+","+ Insdgrun.execution_date+"\n")
except:
    session.rollback()
finally:
    session.close()

t1 = PythonOperator(
    task_id='records',
    provide_context=True,
    python_callable=get_latest_execution_date,
    dag=dag)

Is there any way how to fix and get the latest dag run time information

like image 749
Ravi Avatar asked Sep 06 '20 11:09

Ravi


People also ask

What is execution time Airflow?

The execution time in Airflow is not the actual run time, but rather the start timestamp of its schedule period. For example, the execution time of the first DAG run is 2019–12–05 7:00:00, though it is executed on 2019–12–06.

What is DAG run logical date?

The “logical date” (also called execution_date in Airflow versions prior to 2.2) of a DAG run, for example, denotes the start of the data interval, not when the DAG is actually executed.


2 Answers

There are multiple ways to get the most recent execution of a DagRun. One way is to make use of the Airflow DagRun model.

from airflow.models import DagRun

def get_most_recent_dag_run(dag_id):
    dag_runs = DagRun.find(dag_id=dag_id)
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
    return dag_runs[0] if dag_runs else None


dag_run = get_most_recent_dag_run('fake-dag-id-001')
if dag_run:
    print(f'The most recent DagRun was executed at: {dag_run.execution_date}')

You can find more info on the DagRun model and it's properties in the Airflow Docs located here.

like image 85
Josh Avatar answered Oct 19 '22 02:10

Josh


The PythonOperator op_args parameter is templatized.

The callable only writes the latest execution date to a file so you can implement the function the following way:

def store_last_execution_date(execution_date):
    '''Appends latest execution date to a file
    :param execution_date: The last execution date of the DagRun.
    '''

    with open("/home/Insurance/InsuranceDagsTimestamp.txt", "w+") as f:
        f.write(execution_date)


t1 = PythonOperator(
         task_id="records",
         provide_context=True,
         python_callable=store_last_execution_date,
         op_args=[
             "{{dag.get_latest_execution_date()}}",
         ],
         dag=dag
     )
like image 1
Oluwafemi Sule Avatar answered Oct 19 '22 02:10

Oluwafemi Sule