Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow task after BranchPythonOperator does not fail and succeed correctly

Tags:

python

airflow

In my DAG, I have some tasks that should only be run on Saturdays. Therefore I used a BranchPythonOperator to branch between the tasks for Saturdays and a DummyTask. After that, I join both branches and want to run other tasks.

The workflow looks like this: enter image description here
Here I set the trigger rule for dummy3 to 'one_success' and everything works fine.

The problem I encountered is when something upstream of the BranchPythonOperator fails: enter image description here
The BranchPythonOperator and the branches correctly have the state'upstream_failed', but the task joining the branches becomes 'skipped', therefore the whole workflow shows 'success'.

I tried using 'all_success' as the trigger rule, then it works correctly if something fails the whole workflow fails, but if nothing fails dummy3 gets skipped.

I also tried 'all_done' as the trigger rule, then it works correctly if nothing fails, but if something fails dummy3 still gets executed.

My test code looks like this:

from datetime import datetime, date
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.operators.dummy_operator import DummyOperator

dag = DAG('test_branches',
          description='Test branches',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 1))


def python1():
    raise Exception('Test failure')
    # print 'Test success'


dummy1 = PythonOperator(
    task_id='python1',
    python_callable=python1,
    dag=dag
)


dummy2 = DummyOperator(
    task_id='dummy2',
    dag=dag
)


dummy3 = DummyOperator(
    task_id='dummy3',
    dag=dag,
    trigger_rule='one_success'
)


def is_saturday():
    if date.today().weekday() == 6:
        return 'dummy2'
    else:
        return 'today_is_not_saturday'


branch_on_saturday = BranchPythonOperator(
    task_id='branch_on_saturday',
    python_callable=is_saturday,
    dag=dag)


not_saturday = DummyOperator(
    task_id='today_is_not_saturday',
    dag=dag
)

dummy1 >> branch_on_saturday >> dummy2 >> dummy3
branch_on_saturday >> not_saturday >> dummy3

EDIT

I just figured out an ugly workaround: enter image description here
dummy4 represents a task that I actually need to run, dummy5 is just a dummy.
dummy3 still has the trigger rule 'one_success'.

Now dummy3 and dummy4 run if there is no upstream failure, dummy5 'runs' if the day is not saturday and gets skipped if the day is saturday, which means the DAG is marked as success in both cases.
If there is a failure upstream, dummy3 and dummy4 get skipped and dummy5 gets marked as 'upstream_failed' and the DAG is marked as failed.

This workaround makes my DAG run as I want it to, but I'd still prefer a solution without some hacky workaround.

like image 354
Christopher Beck Avatar asked Jan 28 '23 16:01

Christopher Beck


2 Answers

One workaround that you could use is to put the second part of your DAG in a SubDAG, like I did in the following code illustrating your example: https://gist.github.com/cosenal/cbd38b13450b652291e655138baa1aba

It works as expected and it is arguably cleaner than your workaround as you don't have any additional subsidary dummy operators. However you lost the flat structure and now you have to zoom in the SubDag to see details of the inner one.


A more general observation: after experimenting with your DAG, I came to the conclusion that Airflow needs something like a JoinOperator to replace your Dummy3 operator. Let me explain. The behaviour you describe comes from the fact that the success of a DAG is based only on the last operator being successful (or skipped!).

The following DAG, which ends with «Success» status, is a MWE that backs up the claim above.

def python1():
    raise Exception('Test failure')

dummy1 = PythonOperator(
    task_id='python1',
    python_callable=python1,
    dag=dag
)

dummy2 = DummyOperator(
    task_id='dummy2',
    dag=dag,
    trigger_rule='one_success'
)

dummy1 >> dummy2

It would be cool to have a JoinOperator that fires only if one of the immediate parents is successful and all the others are skipped, whitout having to use the trigger_rule argument.

Alternatively, something that would fix that issue you faced would be a trigger rule all (success | skipped), which you could apply to Dummy3. Unfortunately I don't think you can create custom Trigger Rules on Airflow yet.

EDIT: in the first version of this answer I claimed that the trigger rules one_success and all_success fire according to how successful were all the ancestors of the operator in the DAG, and not just the immediate parents. This doesn't match the documentation and in fact it is invalidated by the following experiment: https://gist.github.com/cosenal/b607825539aa0d308f10f3095e084fac

like image 131
Alessandro Cosentino Avatar answered Jan 31 '23 23:01

Alessandro Cosentino


Setting the trigger rule for dummy3 to 'none_failed' would make it end with the expected status in any cases.

see https://airflow.apache.org/concepts.html#trigger-rules


EDIT : it looks like this 'none_failed' trigger rule was not yet existing when this question was asked and answered: it was added in november 2018

see https://github.com/apache/airflow/pull/4182

like image 25
Géraud Avatar answered Jan 31 '23 22:01

Géraud