Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to restart a failed task on Airflow

I am using a LocalExecutor and my dag has 3 tasks where task(C) is dependant on task(A). Task(B) and task(A) can run in parallel something like below

A-->C

B

So task(A) has failed and but task(B) ran fine. Task(C) is yet to run as task(A) has failed.

My question is how do i re run Task(A) alone so Task(C) runs once Task(A) completes and Airflow UI marks them as success.

like image 895
Chetan J Avatar asked Apr 07 '17 06:04

Chetan J


People also ask

How do you clear an Airflow task?

Click on the "select all" checkbox at the top of the list to select all of the queued tasks. Now, in the "Actions" menu, select "Clear" and apply it to all of the queued tasks. Confirm your choice to Clear the queued tasks. Airflow should immediately prepare to run the queued tasks.

How do I restart the Airflow DAG?

On this page, we should find the DAG runs that don't want to run, select them, and click the 'With selected' menu option. In the new menu, we click the 'Delete' command. After that, Airflow should recreate the missing task instances and perhaps starts to execute the code.

What is Airflow upstream failed?

An upstream failure stops downstream tasks from being executed with the default trigger rule 'all_success', which requires all upstream tasks to be successful. Note that Airflow does continue executing tasks which do not have any dependency on the failed task (fetch_weather and process_weather).


2 Answers

In the UI:

  1. Go to the dag, and dag run of the run you want to change
  2. Click on GraphView
  3. Click on task A
  4. Click "Clear"

This will let task A run again, and if it succeeds, task C should run. This works because when you clear a task's status, the scheduler will treat it as if it hadn't run before for this dag run.

like image 110
jhnclvr Avatar answered Sep 18 '22 04:09

jhnclvr


Here's an alternate solution where you can have it clear and retry certain tasks automatically. If you only want to clear a certain task, you would not use the -d (downstream) flag:

from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta   def clear_upstream_task(context):     execution_date = context.get("execution_date")     clear_tasks = BashOperator(         task_id='clear_tasks',         bash_command=f'airflow tasks clear -s {execution_date}  -t t1 -d -y clear_upstream_task'     )     return clear_tasks.execute(context=context)   # Default settings applied to all tasks default_args = {     'owner': 'airflow',     'depends_on_past': False,     'email_on_failure': False,     'email_on_retry': False,     'retries': 1,     'retry_delay': timedelta(seconds=5) }   with DAG('clear_upstream_task',          start_date=datetime(2021, 1, 1),          max_active_runs=3,          schedule_interval=timedelta(minutes=5),          default_args=default_args,          catchup=False          ) as dag:     t0 = DummyOperator(         task_id='t0'     )      t1 = DummyOperator(         task_id='t1'     )      t2 = DummyOperator(         task_id='t2'     )     t3 = BashOperator(         task_id='t3',         bash_command='exit 123',         on_failure_callback=clear_upstream_task     )      t0 >> t1 >> t2 >> t3 
like image 26
phenderbender Avatar answered Sep 19 '22 04:09

phenderbender