I want to create a snippet that passes the correct date based on whether the DAG was scheduled or whether it was triggered manually. The DAG runs monthly. The DAG generates a report (A SQL query) based on the data of the previous month.
If I run the DAG scheduled, I can fetch the previous month with the following jinja snippet:
execution_date.month
given that the DAG is scheduled at the end of the previous period (last month) the execution_date will correctly return the last month. However on manual runs this will return the current month (execution date will be the date of the manual trigger).
I want to write a simple macro that deals with this case. However I could not find a good way to programmatically query whether the DAG is triggered programmatically. The best I could come up with is to fetch the run_id
from the database (by creating a macro that has a DB session), check wheter the run_id
contains the word manual
. Is there a better way to solve this problem?
Airflow has API. The method you need is POST /api/experimental/dags/<DAG_ID>/dag_runs. With this method you also could pass config params for the dag run. We use Jenkins to trigger dags manually. If you are using Jenkins you could check our jenkins pipeine library.
The Airflow scheduler monitors all tasks and all DAGs, and triggers the task instances whose dependencies have been met. Behind the scenes, it monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) inspects active tasks to see whether they can be triggered.
A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run. Here's a basic example DAG: It defines four Tasks - A, B, C, and D - and dictates the order in which they have to run, and which tasks depend on what others.
You can load up the Airflow UI, navigate to your DAG, and select "Graph View" You can run airflow dags show, which renders it out as an image file We generally recommend you use the Graph View, as it will also show you the state of all the Task Instances within any DAG Run you select.
tl;dr: You can determine this with DagRun.external_trigger
.
I noticed that in the Tree View, there's an outline around runs that are scheduled, but not manual. That's because the latter has stroke-opacity: 0;
applied in CSS.
Searching the repo for this, I found how Airflow devs detect manual runs (5 year old line, so should work in older version as well):
.style("stroke-opacity", function(d) {return d.external_trigger ? "0": "1"})
Searching for external_trigger
brings us to the DagRun
definition.
So if you were using, for example, a Python callback, you can have something like this (can be defined in the DAG, or a separate file):
def my_fun(context):
if context.get('dag_run').external_trigger:
print('manual run')
else:
print('scheduled run')
and in your Operator
set the parameter like:
t1 = BashOperator(
task_id='print_date',
bash_command='date',
on_failure_callback=my_fun,
dag=dag,
)
I have tested something similar and it works.
I think you can also do something like if if {{ dag_run.external_trigger }}:
- but I haven't tested this, and I believe it would only work in that DAG's file.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With