I am using the PythonOperator to call a function that parallelizes data engineering process as an Airflow task. This is done simply by wrapping a simple function with a callable wrapper function called by Airflow.
def wrapper(ds, **kwargs):
process_data()
process_data achieves parallelization using the multiprocessing module that spawns subprocesses. When I run process_data all by itself from jupyter notebook, it runs to the end with no problem. However when I run it using Airflow, the task fails and the log for the task shows something like this.
[2019-01-22 17:16:46,966] {models.py:1610} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-01-22 17:16:46,969] {logging_mixin.py:95} WARNING - Process ForkPoolWorker-129:
[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - Traceback (most recent call last):
[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - File "/home/airflow/.env/lib/python3.5/site-packages/airflow/models.py", line 1612, in signal_handler
raise AirflowException("Task received SIGTERM signal")
[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal
[2019-01-22 17:16:46,993] {models.py:1610} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-01-22 17:16:46,996] {logging_mixin.py:95} WARNING - Process ForkPoolWorker-133:
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - Traceback (most recent call last):
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - File "/usr/lib/python3.5/multiprocessing/pool.py", line 108, in worker
task = get()
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - File "/usr/lib/python3.5/multiprocessing/queues.py", line 343, in get
res = self._reader.recv_bytes()
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - File "/usr/lib/python3.5/multiprocessing/synchronize.py", line 99, in __exit__
return self._semlock.__exit__(*args)
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - File "/home/airflow/.env/lib/python3.5/site-packages/airflow/models.py", line 1612, in signal_handler
raise AirflowException("Task received SIGTERM signal")
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal
[2019-01-22 17:16:47,086] {logging_mixin.py:95} INFO - file parsing and processing 256.07
[2019-01-22 17:17:12,938] {logging_mixin.py:95} INFO - combining and sorting 25.85
I am not quite sure why the task receives SIGTERM. My guess is that some higher level process is sending those to the subprocesses. What should I do to debug this issue?
Just noticed that towards the end of the log for the task, it clearly states that
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2019-01-22 12:31:39,196] {models.py:1764} INFO - Marking task as FAILED.
I followed the answer here. The idea is the same: Not letting Airflow close threads too early:
export AIRFLOW__CORE__KILLED_TASK_CLEANUP_TIME=604800
did the trick.
I too had similar issue when I was running Python multithreaded code. I was able to resolve the same by joining threads. Airflow then waits till all thread gets executed before sending SIGTERM.
threads = [] #array for threads
t = Thread(...)
threads.append(t) #add all threads
# Start all threads
for x in threads:
x.start()
# Wait for all of them to finish
for x in threads:
x.join()
DAGs that run longer or have more than 10-12 tasks (that may run long) seem to have a higher probability of getting SIGTERM.
PROBABLE SOLUTIONS:
schedule_after_task_execution to False
If True, Task supervisor process runs a "mini scheduler". However, after marking it as success, it will call _run_mini_scheduler_on_child_tasks. And while local_task_job.py will detect the task as success and not running, it will terminate the process which might be still executing under _run_mini_scheduler_on_child_tasks.
The problem may also be because the scheduler health check threshold which is set to be smaller than the scheduler heartbeat interval.
The default scheduler_health_check_threshold
was 30 seconds and scheduler_heartbeat_sec
was 60 seconds. During the check for orphaned tasks, the scheduler heartbeat was determined to be older than 30 seconds, which makes sense, because it was only heartbeating every 60 seconds. Thus the scheduler was inferred to be unhealthy and was therefore maybe terminated.
The logs in my case shows that the SIGTERM is associated with tasks being considered as orphaned.
As the the timestamps coincide closely with the SIGTERM received by the task. It seems that since the SchedulerJob was marked as failed, then the TaskInstance running the actual task was considered an orphan, and thus marked for termination. I changes these values as
scheduler_heartbeat_sec
= 60
scheduler_health_check_threshold
= 120 (twice that of scheduler_heartbeat_sec
)
Some explanations blamed metadata database being used as 100% cpu while running dags with multiple tasks in parallel, such as the example used in my case.
Increasing the database may solve the issue, but instead increasing the job_heartbeat_sec
and the other three configurations actually solved the problem .
job_heartbeat_sec = 60
(roughly based on the time almost taken by the task
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With