Let's say, I set celery concurrency to n, but I have m(m>n) ExternalTaskSensor in a dag, it will check another dag named do_sth, these ExternalTaskSensor will consume all celery worker, so that no one will work in fact.
But I can't set concurreny too high(like 2*m), because dag do_sth may start too many process which will lead to out of memory.
I am confused what number I should set to celery concurrency?
In ETL best practices with Airflow's Gotchas section the author addresses this general problem. One of the suggestions is to setup a pool for your sensor tasks so that your other tasks don't get starved. For your situation determine the number of sensor tasks that you want running at one time (less than your concurrency level) and setup a pool with that as a limit. Once your pool is setup pass the pool argument to each of your sensor operators. For more on pools see Airflow's documentation on concepts. Here is an example of passing a pool argument to an operator:
aggregate_db_message_job = BashOperator(
task_id='aggregate_db_message_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_db_msg_agg',
bash_command=aggregate_db_message_job_cmd, dag=dag)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With