Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow celery worker will be blocked if sensor number large than concurrency?

Let's say, I set celery concurrency to n, but I have m(m>n) ExternalTaskSensor in a dag, it will check another dag named do_sth, these ExternalTaskSensor will consume all celery worker, so that no one will work in fact.

But I can't set concurreny too high(like 2*m), because dag do_sth may start too many process which will lead to out of memory.

I am confused what number I should set to celery concurrency?

like image 493
MoreFreeze Avatar asked Feb 28 '17 08:02

MoreFreeze


1 Answers

In ETL best practices with Airflow's Gotchas section the author addresses this general problem. One of the suggestions is to setup a pool for your sensor tasks so that your other tasks don't get starved. For your situation determine the number of sensor tasks that you want running at one time (less than your concurrency level) and setup a pool with that as a limit. Once your pool is setup pass the pool argument to each of your sensor operators. For more on pools see Airflow's documentation on concepts. Here is an example of passing a pool argument to an operator:

aggregate_db_message_job = BashOperator( 
    task_id='aggregate_db_message_job', 
    execution_timeout=timedelta(hours=3), 
    pool='ep_data_pipeline_db_msg_agg',
    bash_command=aggregate_db_message_job_cmd, dag=dag)
like image 111
Jeremy Farrell Avatar answered Dec 25 '22 22:12

Jeremy Farrell