I am looking at several open source workflow schedulers for a DAG of jobs with heterogeneous RAM usage. The scheduler should not only schedule less than a maximum number of threads, but should also keep the total amount of RAM of all concurrent tasks below the available memory.
In this Luigi Q&A, it was explained that
You can set how many of the resource is available in the config, and then how many of the resource the task consumes as a property on the task. This will then limit you to running
nof that task at a time.in config:
[resources] api=1in code for Task:
resources = {"api": 1}
For Airflow, I haven't been able to find the same functionality in its docs. The best that seems possible is to specify a number of available slots in a resource pool, and to also specify that a task instance uses a single slot in a resource pool. However, it appears there is no way to specify that a task instance uses more than one slot in a pool.
Question: specifically for Airflow, how can I specify a quantitative resource usage of a task instance?
concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.
priority_weight defines priorities in the executor queue. The default priority_weight is 1 , and can be bumped to any integer. Moreover, each task has a true priority_weight that is calculated based on its weight_rule which defines weighting method used for the effective total priority weight of the task.
Assuming you're using CeleryExecutor, then starting from airflow version 1.9.0 you can manage Celery's tasks concurrency. This is not exactly memory management you've been asking about but number of concurrent worker's threads executing tasks.
Tweakable parameter is called CELERYD_CONCURRENCY and here is very nicely explained how to manage celery related config in Airflow.
[Edit]
Actually, Pools could also be used to limit concurrency. 
Let's say you want to limit resource hungry task_id so that only 2 instances will be run at the same time. The only thing you need to do is:
create pool (in UI: Admin -> Pools) assign it name e.g. my_pool and define task's concurrency in field Slots (in this case 2)
when instantiating your Operator that will execute this task_id, pass defined pool name (pool=my_pool)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With