In my Airflow DAG I have a task that needs to know if it's the first time it's ran or if it's a retry run. I need to adjust my logic in the task if it's a retry attempt.
I have a few ideas on how I could store the number of retries for the task but I'm not sure if any of them are legitimate or if there's an easier built in way to get this information within the task.
I'm wondering if I can just have an integer variable inside the dag that I append every time the task runs. Then if the task if reran I could check the value of the variable to see that it's greater than 1 and hence would be a retry run. But I'm not sure if mutable global variables work that way in Airflow since there can be multiple workers for different tasks (I'm not sure on this though).
Write it in an XCOM variable?
Retry logic/parameters will take place before failure logic/parameters. So if you have a task set to retry twice, it will attempt to run again two times (and thus executing on_retry_callback ) before failing (and then executing on_failure_callback ).
retries (int) – the number of retries that should be performed before failing the task.
If you want to control your task's state from within custom Task/Operator code, Airflow provides two special exceptions you can raise: AirflowSkipException will mark the current task as skipped. AirflowFailException will mark the current task as failed ignoring any remaining retry attempts.
To rerun a task in Airflow you clear the task status to update the max_tries and current task instance state values in the metastore. After the task reruns, the max_tries value updates to 0 , and the current task instance state updates to None .
The retry number is available from the task instance, which is available via the macro {{ task_instance }}
. https://airflow.apache.org/code.html#default-variables
If you are using the python operator simply add provide_context=True,
to your operator kwargs, and then in the callable do kwargs['task_instance'].try_number
Otherwise you can do something like:
t = BashOperator(
task_id='try_number_test',
bash_command='echo "{{ task_instance.try_number }}"',
dag=dag)
Edit:
When the task instance is cleared, it will set the max_retry number to be the current try_number + retry value. So you could do something like:
ti = # whatever method you do to get the task_instance object
is_first = ti.max_tries - ti.task.retries + 1 == ti.try_number
Airflow will increments the try_number by 1 when running, so I imagine you'd need the + 1 when subtracting the max_tries from the configured retry value. But I didn't test that to confirm
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With