I have a SubDAG in airflow with a long-running step (typically about 2 hours, though it varies based on which unit is being run). Under 1.7.1.3, this step would consistently cause AIRFLOW-736 and the SubDAG would stall in the 'running' state when all steps within were successful. We could work around this as we didn't have steps after the SubDAG by manually marking the SubDagOperator as successful (rather than running) in the database.
We're testing Airflow 1.8.1 now, upgrading by doing the following:
With the system otherwise untouched, the same DAG is now failing 100% of the time roughly after the long-running task hits the 1 hour mark (though oddly, not exactly 3600 seconds later - it can be anywhere from 30 to 90 seconds after the hour ticks) with the message "Executor reports task instance finished (failed) although the task says its running. Was the task killed externally?". However, the task itself continues running on the worker unabated. Somehow, there's disagreement between the scheduler mistaken in thinking the task failed (see this line of jobs.py) based on the database, despite the actual task running fine.
I've confirmed that, somehow, the state is 'failed' in the task_instance table of the airflow database. Thus, I'd like to know what could be setting the task state to failed when the task itself is still running.
Here's a sample dag which triggers the issue:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator
DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}
def define_sub(dag, step_name, sleeptime):
op = BashOperator(
task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag
)
return dag
def gen_sub_dag(parent_name, step_name, sleeptime):
sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS)
define_sub(sub, step_name, sleeptime)
return sub
long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None)
long_sub_dag = SubDagOperator(
subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent
)
If you are indeed running with Celery and Redis have a look at the visibility timeout setting for Celery and increase it beyond the expected end time of your task.
Although we configure Celery to tasks-ack-late, it still has issues with tasks disappearing. We consider this a bug in Celery.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With