I am using a LocalExecutor and my dag has 3 tasks where task(C) is dependant on task(A). Task(B) and task(A) can run in parallel something like below
A-->C
B
So task(A) has failed and but task(B) ran fine. Task(C) is yet to run as task(A) has failed.
My question is how do i re run Task(A) alone so Task(C) runs once Task(A) completes and Airflow UI marks them as success.
Click on the "select all" checkbox at the top of the list to select all of the queued tasks. Now, in the "Actions" menu, select "Clear" and apply it to all of the queued tasks. Confirm your choice to Clear the queued tasks. Airflow should immediately prepare to run the queued tasks.
On this page, we should find the DAG runs that don't want to run, select them, and click the 'With selected' menu option. In the new menu, we click the 'Delete' command. After that, Airflow should recreate the missing task instances and perhaps starts to execute the code.
An upstream failure stops downstream tasks from being executed with the default trigger rule 'all_success', which requires all upstream tasks to be successful. Note that Airflow does continue executing tasks which do not have any dependency on the failed task (fetch_weather and process_weather).
In the UI:
This will let task A run again, and if it succeeds, task C should run. This works because when you clear a task's status, the scheduler will treat it as if it hadn't run before for this dag run.
Here's an alternate solution where you can have it clear and retry certain tasks automatically. If you only want to clear a certain task, you would not use the -d (downstream) flag:
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta def clear_upstream_task(context): execution_date = context.get("execution_date") clear_tasks = BashOperator( task_id='clear_tasks', bash_command=f'airflow tasks clear -s {execution_date} -t t1 -d -y clear_upstream_task' ) return clear_tasks.execute(context=context) # Default settings applied to all tasks default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(seconds=5) } with DAG('clear_upstream_task', start_date=datetime(2021, 1, 1), max_active_runs=3, schedule_interval=timedelta(minutes=5), default_args=default_args, catchup=False ) as dag: t0 = DummyOperator( task_id='t0' ) t1 = DummyOperator( task_id='t1' ) t2 = DummyOperator( task_id='t2' ) t3 = BashOperator( task_id='t3', bash_command='exit 123', on_failure_callback=clear_upstream_task ) t0 >> t1 >> t2 >> t3
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With