Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create dynamic pool in Airflow

Tags:

airflow

I have a DAG that creates a cluster, starts computation tasks, and after they completed, tears down the cluster. I want to limit concurrency for the computation tasks carried on this cluster to fixed number. So logically, I need a pool that is exclusive to the cluster created by a task. I don't want interference with other DAGs or different runs of the same DAG.

I thought I could solve this problem by creating a pool dynamically from a task after the cluster is created and delete it once the computation tasks are finished. I thought I could template the pool parameter of the computation tasks to make them use this dynamically created cluster.

# execute registers a pool and returns with the pool name
create_pool = CreatePoolOperator(
    slots=4,
    task_id='create_pool',
    dag=self
)

# the pool parameter is templated
computation = ComputeOperator(
    task_id=compute_subtask_name,
    pool="{{ ti.xcom_pull(task_ids='create_pool') }}",
    dag=self
)

create_pool >> computation

But this way the computqtion tasks will never be triggered. So I think the pool parameter is saved in the task instance before being templated. I would like to hear your thoughts on how to achieve the desired behavior.

like image 929
Midiparse Avatar asked Dec 06 '25 07:12

Midiparse


1 Answers

Here is an operator that creates a pool if it doesn't exist.

from airflow.api.common.experimental.pool import get_pool, create_pool
from airflow.exceptions import PoolNotFound
from airflow.models import BaseOperator
from airflow.utils import apply_defaults


class CreatePoolOperator(BaseOperator):
    # its pool blue, get it?
    ui_color = '#b8e9ee'

    @apply_defaults
    def __init__(
            self,
            name,
            slots,
            description='',
            *args, **kwargs):
        super(CreatePoolOperator, self).__init__(*args, **kwargs)
        self.description = description
        self.slots = slots
        self.name = name

    def execute(self, context):
        try:
            pool = get_pool(name=self.name)
            if pool:
                self.log(f'Pool exists: {pool}')
                return
        except PoolNotFound:
            # create the pool
            pool = create_pool(name=self.name, slots=self.slots, description=self.description)
            self.log(f'Created pool: {pool}')

deleting the pool could be done in a similar manner.

like image 191
Dave C Avatar answered Dec 11 '25 08:12

Dave C



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!