I thought the macro prev_execution_date
listed here would get me the execution date of the last DAG run, but looking at the source code it seems to only get the last date based on the DAG schedule.
prev_execution_date = task.dag.previous_schedule(self.execution_date)
Is there any way via macros to get the execution date of the DAG when it doesn't run on a schedule?
The execution time in Airflow is not the actual run time, but rather the start timestamp of its schedule period. For example, the execution time of the first DAG run is 2019–12–05 7:00:00, though it is executed on 2019–12–06.
{{ next_ds }} the next execution date as YYYY-MM-DD if exists, else None.
You can use list_dag_runs command with the CLI to list the dag runs for a given dag ID. The information returned includes the state of each run.
Yes, you can define your own custom macro for this as follows:
# custom macro function
def get_last_dag_run(dag):
last_dag_run = dag.get_last_dagrun()
if last_dag_run is None:
return "no prev run"
else:
return last_dag_run.execution_date.strftime("%Y-%m-%d")
# add macro in user_defined_macros in dag definition
dag = DAG(dag_id="my_test_dag",
schedule_interval='@daily',
user_defined_macros={
'last_dag_run_execution_date': get_last_dag_run
}
)
# example of using it in practice
print_vals = BashOperator(
task_id='print_vals',
bash_command='echo {{ last_dag_run_execution_date(dag) }}',
dag=dag
)
Note that the dag.get_last_run() is just one of the many functions available on the Dag object. Here's where I found it: https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/models.py#L3396
You can also tweak the formatting of the string for the date format, and what you want output if there is no previous run.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With