In my DAG, I have some tasks that should only be run on Saturdays. Therefore I used a BranchPythonOperator to branch between the tasks for Saturdays and a DummyTask. After that, I join both branches and want to run other tasks.
The workflow looks like this:
Here I set the trigger rule for dummy3 to 'one_success'
and everything works fine.
The problem I encountered is when something upstream of the BranchPythonOperator fails:
The BranchPythonOperator and the branches correctly have the state'upstream_failed'
, but the task joining the branches becomes 'skipped'
, therefore the whole workflow shows 'success'
.
I tried using 'all_success'
as the trigger rule, then it works correctly if something fails the whole workflow fails, but if nothing fails dummy3 gets skipped.
I also tried 'all_done'
as the trigger rule, then it works correctly if nothing fails, but if something fails dummy3 still gets executed.
My test code looks like this:
from datetime import datetime, date
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.operators.dummy_operator import DummyOperator
dag = DAG('test_branches',
description='Test branches',
catchup=False,
schedule_interval='0 0 * * *',
start_date=datetime(2018, 8, 1))
def python1():
raise Exception('Test failure')
# print 'Test success'
dummy1 = PythonOperator(
task_id='python1',
python_callable=python1,
dag=dag
)
dummy2 = DummyOperator(
task_id='dummy2',
dag=dag
)
dummy3 = DummyOperator(
task_id='dummy3',
dag=dag,
trigger_rule='one_success'
)
def is_saturday():
if date.today().weekday() == 6:
return 'dummy2'
else:
return 'today_is_not_saturday'
branch_on_saturday = BranchPythonOperator(
task_id='branch_on_saturday',
python_callable=is_saturday,
dag=dag)
not_saturday = DummyOperator(
task_id='today_is_not_saturday',
dag=dag
)
dummy1 >> branch_on_saturday >> dummy2 >> dummy3
branch_on_saturday >> not_saturday >> dummy3
I just figured out an ugly workaround:
dummy4 represents a task that I actually need to run, dummy5 is just a dummy.
dummy3 still has the trigger rule 'one_success'
.
Now dummy3 and dummy4 run if there is no upstream failure, dummy5 'runs' if the day is not saturday and gets skipped if the day is saturday, which means the DAG is marked as success in both cases.
If there is a failure upstream, dummy3 and dummy4 get skipped and dummy5 gets marked as 'upstream_failed'
and the DAG is marked as failed.
This workaround makes my DAG run as I want it to, but I'd still prefer a solution without some hacky workaround.
One workaround that you could use is to put the second part of your DAG in a SubDAG, like I did in the following code illustrating your example: https://gist.github.com/cosenal/cbd38b13450b652291e655138baa1aba
It works as expected and it is arguably cleaner than your workaround as you don't have any additional subsidary dummy operators. However you lost the flat structure and now you have to zoom in the SubDag to see details of the inner one.
A more general observation: after experimenting with your DAG, I came to the conclusion that Airflow needs something like a JoinOperator to replace your Dummy3 operator. Let me explain. The behaviour you describe comes from the fact that the success of a DAG is based only on the last operator being successful (or skipped!).
The following DAG, which ends with «Success» status, is a MWE that backs up the claim above.
def python1():
raise Exception('Test failure')
dummy1 = PythonOperator(
task_id='python1',
python_callable=python1,
dag=dag
)
dummy2 = DummyOperator(
task_id='dummy2',
dag=dag,
trigger_rule='one_success'
)
dummy1 >> dummy2
It would be cool to have a JoinOperator that fires only if one of the immediate parents is successful and all the others are skipped, whitout having to use the trigger_rule
argument.
Alternatively, something that would fix that issue you faced would be a trigger rule all (success | skipped)
, which you could apply to Dummy3. Unfortunately I don't think you can create custom Trigger Rules on Airflow yet.
EDIT: in the first version of this answer I claimed that the trigger rules one_success
and all_success
fire according to how successful were all the ancestors of the operator in the DAG, and not just the immediate parents. This doesn't match the documentation and in fact it is invalidated by the following experiment: https://gist.github.com/cosenal/b607825539aa0d308f10f3095e084fac
Setting the trigger rule for dummy3 to 'none_failed'
would make it end with the expected status in any cases.
see https://airflow.apache.org/concepts.html#trigger-rules
EDIT : it looks like this 'none_failed'
trigger rule was not yet existing when this question was asked and answered: it was added in november 2018
see https://github.com/apache/airflow/pull/4182
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With