Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow tasks get stuck at "queued" status and never gets running

Tags:

I'm using Airflow v1.8.1 and run all components (worker, web, flower, scheduler) on kubernetes & Docker. I use Celery Executor with Redis and my tasks are looks like:

(start) -> (do_work_for_product1)      ├  -> (do_work_for_product2)      ├  -> (do_work_for_product3)      ├  … 

So the start task has multiple downstreams. And I setup concurrency related configuration as below:

parallelism = 3 dag_concurrency = 3 max_active_runs = 1 

Then when I run this DAG manually (not sure if it never happens on a scheduled task) , some downstreams get executed, but others stuck at "queued" status.

If I clear the task from Admin UI, it gets executed. There is no worker log (after processing some first downstreams, it just doesn't output any log).

Web server's log (not sure worker exiting is related)

/usr/local/lib/python2.7/dist-packages/flask/exthook.py:71: ExtDeprecationWarning: Importing flask.ext.cache is deprecated, use flask_cache instead.   .format(x=modname), ExtDeprecationWarning [2017-08-24 04:20:56,496] [51] {models.py:168} INFO - Filling up the DagBag from /usr/local/airflow_dags [2017-08-24 04:20:57 +0000] [27] [INFO] Handling signal: ttou [2017-08-24 04:20:57 +0000] [37] [INFO] Worker exiting (pid: 37) 

There is no error log on scheduler, too. And a number of tasks get stuck is changing whenever I try this.

Because I also use Docker I'm wondering if this is related: https://github.com/puckel/docker-airflow/issues/94 But so far, no clue.

Has anyone faced with a similar issue or have some idea what I can investigate for this issue...?

like image 859
Norio Akagi Avatar asked Aug 24 '17 04:08

Norio Akagi


People also ask

What is queue in Airflow?

queue is an attribute of BaseOperator, so any task can be assigned to any queue. The default queue for the environment is defined in the airflow. cfg 's celery -> default_queue . This defines the queue that tasks get assigned to when not specified, as well as which queue Airflow workers listen to when started.

Which executor is best for Airflow?

Executor Types Airflow comes configured with the SequentialExecutor by default, which is a local executor, and the safest option for execution, but we strongly recommend you change this to LocalExecutor for small, single-machine installations, or one of the remote executors for a multi-machine/cloud installation.

How do I know if the Airflow scheduler is running?

CLI Check for Scheduler BaseJob with information about the host and timestamp (heartbeat) at startup, and then updates it regularly. You can use this to check if the scheduler is working correctly. To do this, you can use the airflow jobs checks command. On failure, the command will exit with a non-zero error code.

What is concurrency in Airflow?

concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.


2 Answers

Tasks getting stuck is, most likely, a bug. At the moment (<= 1.9.0alpha1) it can happen when a task cannot even start up on the (remote) worker. This happens for example in the case of an overloaded worker or missing dependencies.

This patch should resolve that issue.

It is worth investigating why your tasks do not get a RUNNING state. Setting itself to this state is first thing a task does. Normally the worker does log before it starts executing and it also reports and errors. You should be able to find entries of this in the task log.

edit: As was mentioned in the comments on the original question in case one example of airflow not being able to run a task is when it cannot write to required locations. This makes it unable to proceed and tasks would get stuck. The patch fixes this by failing the task from the scheduler.

like image 55
Bolke de Bruin Avatar answered Oct 19 '22 23:10

Bolke de Bruin


I have been working on the same docker image puckel. My issue was resolved by :

Replacing

 result_backend = db+postgresql://airflow:airflow@postgres/airflow 

with

celery_result_backend = db+postgresql://airflow:airflow@postgres/airflow 

which I think is updated in the latest pull by puckel. The change was reverted around in Feb 2018 and your comment was made in January.

like image 38
Rohan Sawant Avatar answered Oct 19 '22 22:10

Rohan Sawant