Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to increase tasks queued per second?

I am trying to diagnose an under-performing airflow pipeline and am wondering what kind of performance I should expect out of the airflow scheduler in terms similar to "tasks scheduled per second".

I have few queued jobs and many of my tasks finish in seconds so I suspect the scheduler is the limiting component and it is my fault for having many quick tasks. Still, I would rather not rewrite my DAGs if it can be avoided.

What can I do to increase the rate at which the scheduler queues tasks?


Pipeline Details

Here is what my current airflow.cfg looks like.

I only have two dags running. One is scheduled every 5 min and the other is rarely triggered by the first. I am currently trying to backfill several years at this frequency, but may need to change my approach:

enter image description here

As for worker nodes: I currently have 4 fairly powerful servers running at less than 10% resource usage in disk, network, cpu, RAM, swap. Toggling 3 of the workers off has no impact on my task throughput and the server left on barely even registers the change in workload.

like image 750
7yl4r Avatar asked Feb 01 '18 16:02

7yl4r


People also ask

Is there a way to limit the number of tasks running at the same time for a specific DAG?

concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.

What is queue in Airflow?

queue is an attribute of BaseOperator, so any task can be assigned to any queue. The default queue for the environment is defined in the airflow. cfg 's celery -> default_queue . This defines the queue that tasks get assigned to when not specified, as well as which queue Airflow workers listen to when started.

What is a DAG for Airflow?

DAGs. In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.

What is Start_date in Airflow DAG?

Every DAG has its schedule, start_date is simply the date a DAG should be included in the eyes of the Airflow scheduler. It also helps the developers to release a DAG before its production date. You could set up start_date more dynamically before Airflow 1.8.


1 Answers

There are a number of config values in your airflow.cfg that could be related to this.

Under [core]:

  • parallelism: Total number of task instances that can run at once.
  • dag_concurrency: Limit of task instances that can run per DAG run, may need to bump if you have many parallel tasks. Can override when defining a DAG.
  • non_pooled_task_slot_count: Limit of tasks without a pool configured that can run at once.
  • max_active_runs_per_dag: The maximum number of active DAG runs per DAG. If you're triggering runs manually or there's a backup of DAG runs scheduled with a short interval. Can override when defining a DAG.

Under [scheduler]:

  • schedule_heartbeat_sec: Defines how often the scheduler runs, try it out with lower values.
  • min_file_process_interval: Process each file at most once every N seconds. Set to 0 to never limit how often you process a file.

Under [worker]:

  • celeryd_concurrency: Number of workers celery will run with, so essentially number of task instances a worker can take at once. Matching the number of CPUs is a popular starting point, but can definitely go higher.

Last one is only if you're using the CeleryExecutor, which I'd definitely recommend if you're looking to increase your task throughput.

like image 95
Daniel Huang Avatar answered Sep 29 '22 21:09

Daniel Huang