Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why are my Airflow tasks being "externally set to failed"?

Tags:

airflow

I'm using Airflow 2.0.0, and my tasks are sporadically being killed "externally" after running for a few seconds or minutes. The tasks usually run successfully (both for manual task initiated via airflow tasks test ... and for scheduled DAG runs), so I believe this is not related to my DAG code.

When tasks fail, this seems to be the key error from the task logs:

{local_task_job.py:170} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2020-12-20 11:26:11,448] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [queued]>
[2020-12-20 11:26:11,473] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [queued]>
[2020-12-20 11:26:11,473] {taskinstance.py:1017} INFO - 
--------------------------------------------------------------------------------
[2020-12-20 11:26:11,473] {taskinstance.py:1018} INFO - Starting attempt 3 of 3
[2020-12-20 11:26:11,473] {taskinstance.py:1019} INFO - 
--------------------------------------------------------------------------------
[2020-12-20 11:26:11,506] {taskinstance.py:1038} INFO - Executing <Task(PythonOperator): run_backupper> on 2020-12-19T02:00:00+00:00
[2020-12-20 11:26:11,509] {standard_task_runner.py:51} INFO - Started process 12059 to run task
[2020-12-20 11:26:11,515] {standard_task_runner.py:75} INFO - Running: ['airflow', 'tasks', 'run', 'daily_backups', 'run_backupper', '2020-12-19T02:00:00+00:00', '--job-id', '22', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/backupper/daily_backups.py', '--cfg-path', '/tmp/tmpnfmqtorg']
[2020-12-20 11:26:11,517] {standard_task_runner.py:76} INFO - Job 22: Subtask run_backupper
[2020-12-20 11:26:11,609] {logging_mixin.py:103} INFO - Running <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [running]> on host localhost
[2020-12-20 11:26:11,742] {taskinstance.py:1232} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=<user>
AIRFLOW_CTX_DAG_ID=daily_backups
AIRFLOW_CTX_TASK_ID=run_backupper
AIRFLOW_CTX_EXECUTION_DATE=2020-12-19T02:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2020-12-19T02:00:00+00:00
...
... my job's logs, indicating that the job is running healthily ...
...
[2020-12-20 11:26:16,587] {local_task_job.py:170} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2020-12-20 11:26:16,593] {process_utils.py:95} INFO - Sending Signals.SIGTERM to GPID 12059
[2020-12-20 11:27:16,609] {process_utils.py:108} WARNING - process psutil.Process(pid=12059, name='airflow task runner: daily_backups run_backupper 2020-12-19T02:00:00+00:00 22', status='sleeping', started='11:26:11') did not respond to SIGTERM. Trying SIGKILL
[2020-12-20 11:27:16,618] {process_utils.py:61} INFO - Process psutil.Process(pid=12059, name='airflow task runner: daily_backups run_backupper 2020-12-19T02:00:00+00:00 22', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='11:26:11') (12059) terminated with exit code Negsignal.SIGKILL
[2020-12-20 11:27:16,618] {local_task_job.py:118} INFO - Task exited with return code Negsignal.SIGKILL

The final few lines in the logs are not consistent. Here is a different version, for the same task that failed in an earlier attempt:

... same stuff as before ...
[2020-12-20 02:01:12,689] {local_task_job.py:170} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2020-12-20 02:01:12,695] {process_utils.py:95} INFO - Sending Signals.SIGTERM to GPID 24442
[2020-12-20 02:02:00,462] {taskinstance.py:1214} ERROR - Received SIGTERM. Terminating subprocesses.
[2020-12-20 02:02:00,498] {process_utils.py:61} INFO - Process psutil.Process(pid=24442, status='terminated', exitcode=0, started='02:00:10') (24442) terminated with exit code 0
[2020-12-20 02:02:00,499] {local_task_job.py:118} INFO - Task exited with return code 0

I suspect in this case the script was able to respond to the SIGTERM in time, whereas in the previous case it was blocked on a long-running query and was not able to terminate cleanly.

like image 621
abeboparebop Avatar asked Dec 20 '20 13:12

abeboparebop


People also ask

How many tasks can Airflow handle?

concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.

How do you retry a failed task in Airflow?

If you want to re-run a task in Airflow, the best way to do so is to press Clear or Delete (language depends on the Airflow version you're running), not Run . Hitting this will clear the state of your failed task and allow the scheduler to pick it back up and re-run it.

What is Airflow upstream failed?

An upstream failure stops downstream tasks from being executed with the default trigger rule 'all_success', which requires all upstream tasks to be successful. Note that Airflow does continue executing tasks which do not have any dependency on the failed task (fetch_weather and process_weather).


2 Answers

I believe the problem was that the scheduler health check threshold was set to be smaller than the scheduler heartbeat interval.

In my config I had set scheduler_health_check_threshold to 30 seconds and scheduler_heartbeat_sec to 60 seconds. During the check for orphaned tasks (itself governed by a different parameter, orphaned_tasks_check_interval), the scheduler heartbeat was determined to be older than 30 seconds, which makes sense, because it was only heartbeating every 60 seconds. Thus the scheduler was inferred to be unhealthy and was therefore terminated.

Around the time of the failure, I could see messages like these in /var/log/syslog

Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,368] {scheduler_job.py:1751} INFO - Resetting orphaned tasks for active dag runs
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,373] {scheduler_job.py:1764} INFO - Marked 1 SchedulerJob instances as failed
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,381] {scheduler_job.py:1805} INFO - Reset the following 1 orphaned TaskInstances:
Dec 20 11:26:14 localhost bash[11545]: #011<TaskInstance: daily_backups.run_backupper 2020-12-19 02:00:00+00:00 [running]>
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,571] {scheduler_job.py:938} INFO - 1 tasks up for execution:
Dec 20 11:26:14 localhost bash[11545]: #011<TaskInstance: daily_backups.run_backupper 2020-12-19 02:00:00+00:00 [scheduled]>
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,574] {scheduler_job.py:972} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,575] {scheduler_job.py:999} INFO - DAG daily_backups has 0/16 running and queued tasks
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,575] {scheduler_job.py:1060} INFO - Setting the following tasks to queued state:
Dec 20 11:26:14 localhost bash[11545]: #011<TaskInstance: daily_backups.run_backupper 2020-12-19 02:00:00+00:00 [scheduled]>
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,578] {scheduler_job.py:1102} INFO - Sending TaskInstanceKey(dag_id='daily_backups', task_id='run_backupper', execution_date=datetime.datetime(2020, 12, 19, 2, 0, tzinfo=Timezone('UTC')), try_number=4) to executor with priority 2 and queue default
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,578] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'daily_backups', 'run_backupper', '2020-12-19T02:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/storage/airflow/dags/backupper/daily_backups.py']
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,581] {local_executor.py:81} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'daily_backups', 'run_backupper', '2020-12-19T02:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', '/storage/airflow/dags/backupper/daily_backups.py']
Dec 20 11:26:14 localhost bash[11545]: [2020-12-20 11:26:14,707] {dagbag.py:440} INFO - Filling up the DagBag from /storage/airflow/dags/backupper/daily_backups.py
Dec 20 11:26:15 localhost bash[11545]: Running <TaskInstance: daily_backups.run_backupper 2020-12-19T02:00:00+00:00 [queued]> on host localhost

and the timestamps coincide closely with the SIGTERM received by my task. I guess that since the SchedulerJob was marked as failed, then the TaskInstance running my actual task was considered an orphan, and thus marked for termination. At the same time it scheduled a new attempt (try_number=4).

Increasing the scheduler_health_check_threshold to 120 seconds and restarting the scheduler/webserver services appears to have resolved my issue.

like image 190
abeboparebop Avatar answered Oct 20 '22 03:10

abeboparebop


I had the same issue.

From logs:

2021-05-07 13:04:19,960 INFO - Resetting orphaned tasks for active dag runs
2021-05-07 13:09:20,060 INFO - Resetting orphaned tasks for active dag runs
2021-05-07 13:14:20,186 INFO - Resetting orphaned tasks for active dag runs
2021-05-07 13:19:20,263 INFO - Resetting orphaned tasks for active dag runs
2021-05-07 13:24:20,399 INFO - Resetting orphaned tasks for active dag runs
2021-05-07 13:29:20,729 INFO - Resetting orphaned tasks for active dag runs
2021-05-07 13:34:20,892 INFO - Resetting orphaned tasks for active dag runs
2021-05-07 13:39:21,070 INFO - Resetting orphaned tasks for active dag runs
2021-05-07 13:44:21,328 INFO - Resetting orphaned tasks for active dag runs
2021-05-07 13:49:21,423 INFO - Resetting orphaned tasks for active dag runs

And my scheduler config was as follows:

# Task instances listen for external kill signal (when you clear tasks
# from the CLI or the UI), this defines the frequency at which they should
# listen (in seconds).
job_heartbeat_sec = 5
like image 2
Alejandro Kaspar Avatar answered Oct 20 '22 03:10

Alejandro Kaspar