I am trying to create a custom TaskGroup class to replace existing subdags in our airflow pipelines.
My concern is to pass the dag object instead of using context manager. I used this tutorial to create custom class.
Let's have a look at my custom TaskGroup code:
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task
class TestTaskGroup(TaskGroup):
def __init__(self, dag, group_id="TestTaskGroup", *args, **kwargs):
super().__init__(dag=dag,group_id=group_id, *args, **kwargs)
self.dag = dag
@task(task_group=self)
def emptyTask():
EmptyOperator(task_id="emptytask", dag=self.dag)
@task(task_group=self)
def pythonTask():
PythonOperator(task_id="pythontask", dag=self.dag, python_callable=lambda:print("Hello World"))
emptyTask() >> pythonTask()
In testdag.py :
from taskgroups.testtaskgroup import TestTaskGroup
from airflow import DAG
from datetime import datetime
from airflow.operators.empty import EmptyOperator
dag = DAG(
dag_id="testdag",
start_date = datetime(2024,2,26),
schedule_interval="@daily",
catchup=False,
max_active_runs=1,
default_args={
"owner":"hemant.sah",
}
)
sometask = EmptyOperator(task_id="sometask", dag=dag)
grouptask = TestTaskGroup(dag=dag)
sometask >>grouptask
Above code gives me error:
airflow.exceptions.AirflowException: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again: [<Task(_PythonDecoratedOperator): TestTaskGroup.emptyTask>, <Task(_PythonDecoratedOperator): TestTaskGroup.pythonTask>]
What I tried:
I tried bringing TestTaskGroup inside context manager, this way it works but I want to understand why it's not working when I pass a dag object in TestTaskGroup(dag=dag)
with DAG() as dag:
sometask = EmptyOperator(task_id="sometask")
grouptask = TestTaskGroup() # in this way I am not passing dag object and it works
sometask >> grouptask
I could not find a way where I can pass dag object without creating context manager. Preferring this approach because we have many dags and by following this approach can reduce code changes.
According to the tutorial, you should use the return:
@task(task_group=self)
def emptyTask():
return EmptyOperator(task_id="emptytask", dag=self.dag)
@task(task_group=self)
def pythonTask():
return PythonOperator...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With