Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow: how to specify quantitative usage of a resource pool?

I am looking at several open source workflow schedulers for a DAG of jobs with heterogeneous RAM usage. The scheduler should not only schedule less than a maximum number of threads, but should also keep the total amount of RAM of all concurrent tasks below the available memory.

In this Luigi Q&A, it was explained that

You can set how many of the resource is available in the config, and then how many of the resource the task consumes as a property on the task. This will then limit you to running n of that task at a time.

in config:

[resources]
api=1

in code for Task:

resources = {"api": 1}

For Airflow, I haven't been able to find the same functionality in its docs. The best that seems possible is to specify a number of available slots in a resource pool, and to also specify that a task instance uses a single slot in a resource pool. However, it appears there is no way to specify that a task instance uses more than one slot in a pool.

Question: specifically for Airflow, how can I specify a quantitative resource usage of a task instance?

like image 523
TemplateRex Avatar asked Sep 02 '18 20:09

TemplateRex


People also ask

What is concurrency in Airflow?

concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.

What is Priority_weight Airflow?

priority_weight defines priorities in the executor queue. The default priority_weight is 1 , and can be bumped to any integer. Moreover, each task has a true priority_weight that is calculated based on its weight_rule which defines weighting method used for the effective total priority weight of the task.


1 Answers

Assuming you're using CeleryExecutor, then starting from airflow version 1.9.0 you can manage Celery's tasks concurrency. This is not exactly memory management you've been asking about but number of concurrent worker's threads executing tasks.

Tweakable parameter is called CELERYD_CONCURRENCY and here is very nicely explained how to manage celery related config in Airflow.

[Edit]

Actually, Pools could also be used to limit concurrency. Let's say you want to limit resource hungry task_id so that only 2 instances will be run at the same time. The only thing you need to do is:

  • create pool (in UI: Admin -> Pools) assign it name e.g. my_pool and define task's concurrency in field Slots (in this case 2)

  • when instantiating your Operator that will execute this task_id, pass defined pool name (pool=my_pool)

like image 91
bartgras Avatar answered Sep 20 '22 10:09

bartgras