Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow signals SIGTERM to subprocesses unexpectedly

Tags:

python

airflow

I am using the PythonOperator to call a function that parallelizes data engineering process as an Airflow task. This is done simply by wrapping a simple function with a callable wrapper function called by Airflow.

def wrapper(ds, **kwargs):
    process_data()

process_data achieves parallelization using the multiprocessing module that spawns subprocesses. When I run process_data all by itself from jupyter notebook, it runs to the end with no problem. However when I run it using Airflow, the task fails and the log for the task shows something like this.

[2019-01-22 17:16:46,966] {models.py:1610} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-01-22 17:16:46,969] {logging_mixin.py:95} WARNING - Process ForkPoolWorker-129:

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - Traceback (most recent call last):

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING -   File "/home/airflow/.env/lib/python3.5/site-packages/airflow/models.py", line 1612, in signal_handler
    raise AirflowException("Task received SIGTERM signal")

[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal

[2019-01-22 17:16:46,993] {models.py:1610} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-01-22 17:16:46,996] {logging_mixin.py:95} WARNING - Process ForkPoolWorker-133:

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - Traceback (most recent call last):

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/queues.py", line 343, in get
    res = self._reader.recv_bytes()

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/usr/lib/python3.5/multiprocessing/synchronize.py", line 99, in __exit__
    return self._semlock.__exit__(*args)

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING -   File "/home/airflow/.env/lib/python3.5/site-packages/airflow/models.py", line 1612, in signal_handler
    raise AirflowException("Task received SIGTERM signal")

[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal

[2019-01-22 17:16:47,086] {logging_mixin.py:95} INFO - file parsing and processing 256.07

[2019-01-22 17:17:12,938] {logging_mixin.py:95} INFO - combining and sorting 25.85

I am not quite sure why the task receives SIGTERM. My guess is that some higher level process is sending those to the subprocesses. What should I do to debug this issue?

Just noticed that towards the end of the log for the task, it clearly states that

airflow.exceptions.AirflowException: Task received SIGTERM signal
[2019-01-22 12:31:39,196] {models.py:1764} INFO - Marking task as FAILED.
like image 282
jiminssy Avatar asked Jan 22 '19 17:01

jiminssy


3 Answers

I followed the answer here. The idea is the same: Not letting Airflow close threads too early:

export AIRFLOW__CORE__KILLED_TASK_CLEANUP_TIME=604800 did the trick.

like image 82
Yeu-Chern Harn Avatar answered Nov 04 '22 03:11

Yeu-Chern Harn


I too had similar issue when I was running Python multithreaded code. I was able to resolve the same by joining threads. Airflow then waits till all thread gets executed before sending SIGTERM.

threads = []   #array for threads

 t = Thread(...)
 threads.append(t) #add all threads

 # Start all threads
 for x in threads:
     x.start()

 # Wait for all of them to finish
 for x in threads:
     x.join()
like image 38
manohar amrutkar Avatar answered Nov 04 '22 01:11

manohar amrutkar


DAGs that run longer or have more than 10-12 tasks (that may run long) seem to have a higher probability of getting SIGTERM.

PROBABLE SOLUTIONS: schedule_after_task_execution to False If True, Task supervisor process runs a "mini scheduler". However, after marking it as success, it will call _run_mini_scheduler_on_child_tasks. And while local_task_job.py will detect the task as success and not running, it will terminate the process which might be still executing under _run_mini_scheduler_on_child_tasks.

The problem may also be because the scheduler health check threshold which is set to be smaller than the scheduler heartbeat interval. The default  scheduler_health_check_threshold was 30 seconds and scheduler_heartbeat_sec was 60 seconds. During the check for orphaned tasks, the scheduler heartbeat was determined to be older than 30 seconds, which makes sense, because it was only heartbeating every 60 seconds. Thus the scheduler was inferred to be unhealthy and was therefore maybe terminated.

The logs in my case shows that the SIGTERM is associated with tasks being considered as orphaned.

As the  the timestamps coincide closely with the SIGTERM received by the task. It seems that since the SchedulerJob was marked as failed, then the TaskInstance running the actual task was considered an orphan, and thus marked for termination. I changes these values as scheduler_heartbeat_sec = 60 scheduler_health_check_threshold = 120 (twice that of scheduler_heartbeat_sec) Some explanations blamed  metadata database being used as 100% cpu while running dags with multiple tasks in parallel, such as the example used in my case. Increasing the database may solve the issue, but instead increasing the job_heartbeat_sec and the other three configurations actually solved the problem . job_heartbeat_sec = 60 (roughly based on the time almost taken by the task

like image 1
caxefaizan Avatar answered Nov 04 '22 01:11

caxefaizan