Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get airflow to add thousands of tasks to celery at one time?

I'm evaluating Airflow 1.9.0 for our distributed orchestration needs (using CeleryExecutor and RabbitMQ), and I am seeing something strange.

I made a dag that has three stages:

  1. start
  2. fan out and run N tasks concurrently
  3. finish

N can be large, maybe up to 10K. I would expect to see N tasks get dumped onto the Rabbit queue when stage 2 begins. Instead I am seeing only a few hundred added at a time. As the workers process the tasks and the queue gets smaller, then more get added to Celery/Rabbit. Eventually, it does finish, however I would really prefer that it dump ALL the work (all 10K tasks) into Celery immediately, for two reasons:

  1. The current way makes the scheduler long-lived and stateful. The scheduler might die after only 5K have completed, in which case the remaining 5K tasks would never get added (I verified this)

  2. I want to use the size of the Rabbit queue as metric to trigger autoscaling events to add more workers. So I need a true picture of how much outstanding work remains (10K, not a few hundred)

I assume the scheduler has some kind of throttle that keeps it from dumping all 10K messages simultaneously? If so is this configurable?

FYI I have already set “parallelism” to 10K in the airflow.cfg

Here is my test dag:

# This dag tests how well airflow fans out

from airflow import DAG
from datetime import datetime, timedelta

from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('fan_out', default_args=default_args, schedule_interval=None)

num_tasks = 10000

starting = BashOperator(
    task_id='starting',
    bash_command='echo starting',
    dag=dag
)

all_done = BashOperator(
    task_id='all_done',
    bash_command='echo all done',
    dag=dag)

for i in range(0, num_tasks):
    task = BashOperator(
        task_id='say_hello_' + str(i),
        bash_command='echo hello world',
        dag=dag)
    task.set_upstream(starting)
    task.set_downstream(all_done)
like image 960
Kevin Pauli Avatar asked Jun 26 '18 22:06

Kevin Pauli


People also ask

How many tasks can an Airflow worker handle?

concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.

How many tasks can run in parallel Airflow?

Parallelism: This is the maximum number of tasks that can run at the same time in a single Airflow environment. If this setting is set to 32, for example, no more than 32 tasks can run concurrently across all DAGs.

How do you Airflow with Celery?

To set up the Airflow Celery Executor, first, you need to set up an Airflow Celery backend using the message broker services such as RabbitMQ, Redis, etc. After that, you need to change the airflow. cfg file to point the executor parameters to CeleryExecutor and enter all the required configurations for it.

How Apache airflow distributes jobs on Celery workers?

It can distribute tasks on multiple workers by using a protocol to transfer jobs from the main application to Celery workers. It relies on a message broker to transfer the messages.


2 Answers

There are a couple other settings you'll want to increase.

Under [core] increase non_pooled_task_slot_count. This will allow more tasks to actually be queued up in celery.

Under [celery] increase celeryd_concurrency. This will increase the number of tasks each worker will attempt to run from the queue at the same time.

That being said, in response to your first reason...

While true, the remaining tasks won't get queued if the scheduler isn't running, but this is because the Airflow scheduler is designed to be long lived. It should always be running when your workers are running. Should the scheduler be killed or die for whatever reason, once it starts back up it will pick up where it left off.

like image 172
cwurtz Avatar answered Sep 29 '22 21:09

cwurtz


Thanks to those who suggested other concurrency settings. Through trial and error I learned that I need to set all three of these:

 - AIRFLOW__CORE__PARALLELISM=10000
 - AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000
 - AIRFLOW__CORE__DAG_CONCURRENCY=10000

With only these two enabled, I can get to 10K but it is very slow, only adding 100 new tasks in bursts every 30 seconds, in a stair-step fashion:

 - AIRFLOW__CORE__PARALLELISM=10000
 - AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000

If I only enable these two, it is the same "stair-step" pattern, with 128 added every 30 seconds:

 - AIRFLOW__CORE__PARALLELISM=10000
 - AIRFLOW__CORE__DAG_CONCURRENCY=10000

But if I set all three, it does add 10K to the queue in one shot.

like image 36
Kevin Pauli Avatar answered Sep 29 '22 21:09

Kevin Pauli