I have a DAG that creates a cluster, starts computation tasks, and after they completed, tears down the cluster. I want to limit concurrency for the computation tasks carried on this cluster to fixed number. So logically, I need a pool that is exclusive to the cluster created by a task. I don't want interference with other DAGs or different runs of the same DAG.
I thought I could solve this problem by creating a pool dynamically from a task after the cluster is created and delete it once the computation tasks are finished. I thought I could template the pool parameter of the computation tasks to make them use this dynamically created cluster.
# execute registers a pool and returns with the pool name
create_pool = CreatePoolOperator(
slots=4,
task_id='create_pool',
dag=self
)
# the pool parameter is templated
computation = ComputeOperator(
task_id=compute_subtask_name,
pool="{{ ti.xcom_pull(task_ids='create_pool') }}",
dag=self
)
create_pool >> computation
But this way the computqtion tasks will never be triggered. So I think the pool parameter is saved in the task instance before being templated. I would like to hear your thoughts on how to achieve the desired behavior.
Here is an operator that creates a pool if it doesn't exist.
from airflow.api.common.experimental.pool import get_pool, create_pool
from airflow.exceptions import PoolNotFound
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
class CreatePoolOperator(BaseOperator):
# its pool blue, get it?
ui_color = '#b8e9ee'
@apply_defaults
def __init__(
self,
name,
slots,
description='',
*args, **kwargs):
super(CreatePoolOperator, self).__init__(*args, **kwargs)
self.description = description
self.slots = slots
self.name = name
def execute(self, context):
try:
pool = get_pool(name=self.name)
if pool:
self.log(f'Pool exists: {pool}')
return
except PoolNotFound:
# create the pool
pool = create_pool(name=self.name, slots=self.slots, description=self.description)
self.log(f'Created pool: {pool}')
deleting the pool could be done in a similar manner.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With