I'm evaluating Airflow 1.9.0 for our distributed orchestration needs (using CeleryExecutor and RabbitMQ), and I am seeing something strange.
I made a dag that has three stages:
N can be large, maybe up to 10K. I would expect to see N tasks get dumped onto the Rabbit queue when stage 2 begins. Instead I am seeing only a few hundred added at a time. As the workers process the tasks and the queue gets smaller, then more get added to Celery/Rabbit. Eventually, it does finish, however I would really prefer that it dump ALL the work (all 10K tasks) into Celery immediately, for two reasons:
The current way makes the scheduler long-lived and stateful. The scheduler might die after only 5K have completed, in which case the remaining 5K tasks would never get added (I verified this)
I want to use the size of the Rabbit queue as metric to trigger autoscaling events to add more workers. So I need a true picture of how much outstanding work remains (10K, not a few hundred)
I assume the scheduler has some kind of throttle that keeps it from dumping all 10K messages simultaneously? If so is this configurable?
FYI I have already set “parallelism” to 10K in the airflow.cfg
Here is my test dag:
# This dag tests how well airflow fans out
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('fan_out', default_args=default_args, schedule_interval=None)
num_tasks = 10000
starting = BashOperator(
task_id='starting',
bash_command='echo starting',
dag=dag
)
all_done = BashOperator(
task_id='all_done',
bash_command='echo all done',
dag=dag)
for i in range(0, num_tasks):
task = BashOperator(
task_id='say_hello_' + str(i),
bash_command='echo hello world',
dag=dag)
task.set_upstream(starting)
task.set_downstream(all_done)
concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.
Parallelism: This is the maximum number of tasks that can run at the same time in a single Airflow environment. If this setting is set to 32, for example, no more than 32 tasks can run concurrently across all DAGs.
To set up the Airflow Celery Executor, first, you need to set up an Airflow Celery backend using the message broker services such as RabbitMQ, Redis, etc. After that, you need to change the airflow. cfg file to point the executor parameters to CeleryExecutor and enter all the required configurations for it.
It can distribute tasks on multiple workers by using a protocol to transfer jobs from the main application to Celery workers. It relies on a message broker to transfer the messages.
There are a couple other settings you'll want to increase.
Under [core]
increase non_pooled_task_slot_count
. This will allow more tasks to actually be queued up in celery.
Under [celery
] increase celeryd_concurrency
. This will increase the number of tasks each worker will attempt to run from the queue at the same time.
That being said, in response to your first reason...
While true, the remaining tasks won't get queued if the scheduler isn't running, but this is because the Airflow scheduler is designed to be long lived. It should always be running when your workers are running. Should the scheduler be killed or die for whatever reason, once it starts back up it will pick up where it left off.
Thanks to those who suggested other concurrency settings. Through trial and error I learned that I need to set all three of these:
- AIRFLOW__CORE__PARALLELISM=10000
- AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000
- AIRFLOW__CORE__DAG_CONCURRENCY=10000
With only these two enabled, I can get to 10K but it is very slow, only adding 100 new tasks in bursts every 30 seconds, in a stair-step fashion:
- AIRFLOW__CORE__PARALLELISM=10000
- AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000
If I only enable these two, it is the same "stair-step" pattern, with 128 added every 30 seconds:
- AIRFLOW__CORE__PARALLELISM=10000
- AIRFLOW__CORE__DAG_CONCURRENCY=10000
But if I set all three, it does add 10K to the queue in one shot.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With