Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to control the parallelism or concurrency of an Airflow installation?

In some of my Apache Airflow installations, DAGs or tasks that are scheduled to run do not run even when the scheduler doesn't appear to be fully loaded. How can I increase the number of DAGs or tasks that can run concurrently?

Similarly, if my installation is under high load and I want to limit how quickly my Airflow workers pull queued tasks (such as to reduce resource consumption), what can I adjust to reduce the average load?

like image 228
hexacyanide Avatar asked May 30 '19 02:05

hexacyanide


People also ask

What is parallelism in Airflow?

parallelism : This is the maximum number of tasks that can run concurrently per scheduler within a single Airflow environment. For example, if this setting is set to 32, and there are two schedulers, then no more than 64 tasks can be in the running or queued states at once across all DAGs.

How do you run Airflow DAGs in parallel?

So to allow Airflow to run tasks in Parallel you will need to create a database in Postges or MySQL and configure it in airflow. cfg ( sql_alchemy_conn param) and then change your executor to LocalExecutor in airflow. cfg and then run airflow initdb .

How many tasks can you execute in parallel in Airflow?

Apache Airflow's capability to run parallel tasks, ensured by using Kubernetes and CeleryExecutor, allows you to save a lot of time. You can use it to execute even 1000 parallel tasks in only 5 minutes.

How do I stop my DAG from Airflowing?

Please notice that if the DAG is currently running, the Airflow scheduler will start again the tasks you delete. So either you stop the DAG first by changing its state or stop the scheduler (if you are running on a test environment).

What is the difference between task_concurrency and parallelism in airflow?

task_concurrency: concurrency limit for the same task across multiple DAG runs core.parallelism: maximum number of tasks running across an entire Airflow installation core.dag_concurrency: max number of tasks that can be running per DAG (across multiple DAG runs)

What is parallelism in airflow?

Parallelism: This is the maximum number of tasks that can run at the same time in a single Airflow environment. If this setting is set to 32, for example, no more than 32 tasks can run concurrently across all DAGs.

How to control the concurrent (parallel) run of a flow?

We can control the concurrent (parallel) run of our flow using concurrency control. By enabling this feature we can limit the highest number of flow runs that can run at the same time, or in parallel. If we don’t enable this feature our flow will run concurrently by default. (unlimited) It cannot be undone once enabled.

How does concurrency work in airflow scheduler?

from airlow documentation: concurrency: The Airflow scheduler will run no more than $concurrency task instances for your DAG at any given time. Concurrency is defined in your Airflow DAG. If you do not set the concurrency on your DAG, the scheduler will use the default value from the dag_concurrency entry in your airflow.cfg.


2 Answers

Here's an expanded list of configuration options that are available since Airflow v1.10.2. Some can be set on a per-DAG or per-operator basis, but may also fall back to the setup-wide defaults when they are not specified.


Options that can be specified on a per-DAG basis:

  • concurrency: the number of task instances allowed to run concurrently across all active runs of the DAG this is set on. Defaults to core.dag_concurrency if not set
  • max_active_runs: maximum number of active runs for this DAG. The scheduler will not create new active DAG runs once this limit is hit. Defaults to core.max_active_runs_per_dag if not set

Examples:

# Only allow one run of this DAG to be running at any given time dag = DAG('my_dag_id', max_active_runs=1)  # Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs dag = DAG('example2', concurrency=10, max_active_runs=2) 

Options that can be specified on a per-operator basis:

  • pool: the pool to execute the task in. Pools can be used to limit parallelism for only a subset of tasks
  • task_concurrency: concurrency limit for the same task across multiple DAG runs

Example:

t1 = BaseOperator(pool='my_custom_pool', task_concurrency=12) 

Options that are specified across an entire Airflow setup:

  • core.parallelism: maximum number of tasks running across an entire Airflow installation
  • core.dag_concurrency: max number of tasks that can be running per DAG (across multiple DAG runs)
  • core.non_pooled_task_slot_count: number of task slots allocated to tasks not running in a pool
  • core.max_active_runs_per_dag: maximum number of active DAG runs, per DAG
  • scheduler.max_threads: how many threads the scheduler process should use to use to schedule DAGs
  • celery.worker_concurrency: max number of task instances that a worker will process at a time if using CeleryExecutor
  • celery.sync_parallelism: number of processes CeleryExecutor should use to sync task state
like image 58
hexacyanide Avatar answered Sep 21 '22 03:09

hexacyanide


An illustration for three major concurrency control variables:

an illustration

From airflow version 2.2, task_concurrency parameter is deprecated by max_active_tis_per_dag.

https://airflow.apache.org/docs/stable/faq.html#how-can-my-airflow-dag-run-faster

like image 37
skwon Avatar answered Sep 21 '22 03:09

skwon