Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow - long running task in SubDag marked as failed after an hour

Tags:

python

airflow

I have a SubDAG in airflow with a long-running step (typically about 2 hours, though it varies based on which unit is being run). Under 1.7.1.3, this step would consistently cause AIRFLOW-736 and the SubDAG would stall in the 'running' state when all steps within were successful. We could work around this as we didn't have steps after the SubDAG by manually marking the SubDagOperator as successful (rather than running) in the database.

We're testing Airflow 1.8.1 now, upgrading by doing the following:

  1. Shuting down our scheduler and workers
  2. Via pip, uninstalling airflow and installing apache-airflow (version 1.8.1)
  3. Runing airflow upgradedb
  4. Running the airflow scheduler and workers

With the system otherwise untouched, the same DAG is now failing 100% of the time roughly after the long-running task hits the 1 hour mark (though oddly, not exactly 3600 seconds later - it can be anywhere from 30 to 90 seconds after the hour ticks) with the message "Executor reports task instance finished (failed) although the task says its running. Was the task killed externally?". However, the task itself continues running on the worker unabated. Somehow, there's disagreement between the scheduler mistaken in thinking the task failed (see this line of jobs.py) based on the database, despite the actual task running fine.

I've confirmed that, somehow, the state is 'failed' in the task_instance table of the airflow database. Thus, I'd like to know what could be setting the task state to failed when the task itself is still running.

Here's a sample dag which triggers the issue:

from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator

DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}

def define_sub(dag, step_name, sleeptime):
    op = BashOperator(
        task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag
    )
    return dag

def gen_sub_dag(parent_name, step_name, sleeptime):
    sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS)
    define_sub(sub, step_name, sleeptime)
    return sub

long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None)

long_sub_dag = SubDagOperator(
    subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent
)
like image 369
J. Doe Avatar asked May 31 '17 01:05

J. Doe


1 Answers

If you are indeed running with Celery and Redis have a look at the visibility timeout setting for Celery and increase it beyond the expected end time of your task.

Although we configure Celery to tasks-ack-late, it still has issues with tasks disappearing. We consider this a bug in Celery.

like image 153
Bolke de Bruin Avatar answered Nov 15 '22 00:11

Bolke de Bruin