I am learning Airflow and I looked at one of the example DAGs that are shipped with Airflow (example_branch_python_dop_operator_3.py)
In this example, the DAG branches to one branch if the minute (of the execution datetime) is an even number, and another branch if the minute is an odd number. Additionally, the DAG has depends_on_past
set to True
as a default value for all the tasks. The full code is:
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
'depends_on_past': True,
}
# BranchPython operator that depends on past
# and where tasks may run or be skipped on
# alternating runs
dag = DAG(
dag_id='example_branch_dop_operator_v3',
schedule_interval='*/1 * * * *',
default_args=args,
)
def should_run(**kwargs):
print('------------- exec dttm = {} and minute = {}'.
format(kwargs['execution_date'], kwargs['execution_date'].minute))
if kwargs['execution_date'].minute % 2 == 0:
return "dummy_task_1"
else:
return "dummy_task_2"
cond = BranchPythonOperator(
task_id='condition',
provide_context=True,
python_callable=should_run,
dag=dag,
)
dummy_task_1 = DummyOperator(task_id='dummy_task_1', dag=dag)
dummy_task_2 = DummyOperator(task_id='dummy_task_2', dag=dag)
cond >> [dummy_task_1, dummy_task_2]
I would have expected, since depends_on_past
is True, that after the first DAG Run the tasks would no longer be able to start. Each task would look at the status of the previous task and see that it was skipped
, which is not success, and essentially hang without a status.
However, that is not what happened. Here are the results in Tree View:
As you can see, all of the selected tasks are running in every DAG Run. Why is this happening? Do I misunderstand what depends_on_past
means? I thought each task looked at the status of the task with the same task_id in the previous DAG Run.
To get this to run, I simply turned the DAG on in the main interface, so I believe these are scheduled runs.
Show activity on this post. From the official docs for trigger rules: depends_on_past (boolean) when set to True, keeps a task from getting triggered if the previous schedule for the task hasn't succeeded.
max_active_runs : This is the maximum number of active DAG runs allowed for the DAG in question. Once this limit is hit, the Scheduler will not create new active DAG runs.
An Airflow DAG defined with a start_date , possibly an end_date , and a non-dataset schedule, defines a series of intervals which the scheduler turns into individual DAG runs and executes.
Upstream task: A task that must reach a specified state before a dependent task can run. Downstream task: A dependent task that cannot run until an upstream task reaches a specified state.
What is Functional Dependency? Functional Dependency (FD) is a constraint that determines the relation of one attribute to another attribute in a Database Management System (DBMS).
A Transitive Functional Dependency is a type of functional dependency which happens when the non- key attribute is indirectly formed by its functional dependencies on the primary key attributes. Either the value or the known factors can be the reason for this type of Functional Dependency occurrence.
I would have expected, since depends_on_past is True, that after the first DAG Run the tasks would no longer be able to start. Each task would look at the status of the previous task and see that it was skipped, which is not success, and essentially hang without a status.
In terms of Functional Dependency, Student_ID is the determinant, Student_Name, Dept, DOB are the dependents. Student_ID is the primary key here, while Student_Name, Dept, and DOB are non-key columns. Hence the dependence can be symbolized as,
From changelog for version Airflow 1.7.1, 2016-05-19
- Treat SKIPPED and SUCCESS the same way when evaluating depends_on_past=True
Looks like condition is checked here:
airflow/ti_deps/deps/prev_dagrun_dep.py (master brunch)
line 75: if previous_ti.state not in {State.SKIPPED, State.SUCCESS}:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With