Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow Get Retry Number

Tags:

airflow

In my Airflow DAG I have a task that needs to know if it's the first time it's ran or if it's a retry run. I need to adjust my logic in the task if it's a retry attempt.

I have a few ideas on how I could store the number of retries for the task but I'm not sure if any of them are legitimate or if there's an easier built in way to get this information within the task.

  • I'm wondering if I can just have an integer variable inside the dag that I append every time the task runs. Then if the task if reran I could check the value of the variable to see that it's greater than 1 and hence would be a retry run. But I'm not sure if mutable global variables work that way in Airflow since there can be multiple workers for different tasks (I'm not sure on this though).

  • Write it in an XCOM variable?

like image 319
Kyle Bridenstine Avatar asked Aug 08 '18 22:08

Kyle Bridenstine


People also ask

How does retry work in Airflow?

Retry logic/parameters will take place before failure logic/parameters. So if you have a task set to retry twice, it will attempt to run again two times (and thus executing on_retry_callback ) before failing (and then executing on_failure_callback ).

What is retries in Airflow DAG?

retries (int) – the number of retries that should be performed before failing the task.

How do I mark Airflow as failed?

If you want to control your task's state from within custom Task/Operator code, Airflow provides two special exceptions you can raise: AirflowSkipException will mark the current task as skipped. AirflowFailException will mark the current task as failed ignoring any remaining retry attempts.

How do you rerun a failed task in Airflow?

To rerun a task in Airflow you clear the task status to update the max_tries and current task instance state values in the metastore. After the task reruns, the max_tries value updates to 0 , and the current task instance state updates to None .


1 Answers

The retry number is available from the task instance, which is available via the macro {{ task_instance }}. https://airflow.apache.org/code.html#default-variables

If you are using the python operator simply add provide_context=True, to your operator kwargs, and then in the callable do kwargs['task_instance'].try_number

Otherwise you can do something like:

t = BashOperator(
    task_id='try_number_test',
    bash_command='echo "{{ task_instance.try_number }}"',
    dag=dag)

Edit:

When the task instance is cleared, it will set the max_retry number to be the current try_number + retry value. So you could do something like:

ti = # whatever method you do to get the task_instance object
is_first = ti.max_tries - ti.task.retries + 1 == ti.try_number

Airflow will increments the try_number by 1 when running, so I imagine you'd need the + 1 when subtracting the max_tries from the configured retry value. But I didn't test that to confirm

like image 72
cwurtz Avatar answered Sep 20 '22 14:09

cwurtz