Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Airflow:Create reusable custom TaskGroup

I am trying to create a custom TaskGroup class to replace existing subdags in our airflow pipelines.

My concern is to pass the dag object instead of using context manager. I used this tutorial to create custom class.

Let's have a look at my custom TaskGroup code:

from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task

class TestTaskGroup(TaskGroup):
    def __init__(self, dag,  group_id="TestTaskGroup", *args, **kwargs):
        super().__init__(dag=dag,group_id=group_id, *args, **kwargs)
        self.dag = dag

        @task(task_group=self)
        def emptyTask():            
            EmptyOperator(task_id="emptytask", dag=self.dag)
            

        @task(task_group=self)
        def pythonTask():            
            PythonOperator(task_id="pythontask", dag=self.dag, python_callable=lambda:print("Hello World"))

        emptyTask() >> pythonTask()

In testdag.py :


from taskgroups.testtaskgroup import TestTaskGroup
from airflow import DAG
from datetime import datetime
from airflow.operators.empty import EmptyOperator

dag = DAG(
    dag_id="testdag",
    start_date = datetime(2024,2,26),
    schedule_interval="@daily",
    catchup=False,
    max_active_runs=1,
    default_args={
        "owner":"hemant.sah",
    }
) 

sometask = EmptyOperator(task_id="sometask", dag=dag)
grouptask = TestTaskGroup(dag=dag)

sometask >>grouptask

Above code gives me error:

airflow.exceptions.AirflowException: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again: [<Task(_PythonDecoratedOperator): TestTaskGroup.emptyTask>, <Task(_PythonDecoratedOperator): TestTaskGroup.pythonTask>]

What I tried:

I tried bringing TestTaskGroup inside context manager, this way it works but I want to understand why it's not working when I pass a dag object in TestTaskGroup(dag=dag)

with DAG() as dag:
    sometask = EmptyOperator(task_id="sometask")
    grouptask = TestTaskGroup() # in this way I am not passing dag object and it works
    sometask >> grouptask

I could not find a way where I can pass dag object without creating context manager. Preferring this approach because we have many dags and by following this approach can reduce code changes.

like image 895
Hemant Sah Avatar asked Dec 15 '25 08:12

Hemant Sah


1 Answers

According to the tutorial, you should use the return:

        @task(task_group=self)
        def emptyTask():            
            return EmptyOperator(task_id="emptytask", dag=self.dag)
            

        @task(task_group=self)
        def pythonTask():            
            return PythonOperator...
like image 77
ProBoDiS Avatar answered Dec 16 '25 21:12

ProBoDiS



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!