Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow Branch Operator and Task Group Invalid Task IDs

Tags:

python

airflow

I have a simple dag that uses a branch operator to check if y is False. If it is, the dag is supposed to move on to the say_goodbye task group. If True, it skips and goes to finish_dag_step. Here's the dag:

def which_step() -> str:
  y = False
  if not y:
      return 'say_goodbye'
  else:
      return 'finish_dag_step'

with DAG(
  'my_test_dag',
  start_date = datetime(2022, 5, 14),
  schedule_interval = '0 0 * * *',
  catchup = True) as dag:

say_hello = BashOperator(
    task_id = 'say_hello',
    retries = 3,
    bash_command = 'echo "hello world"'
)

run_which_step = BranchPythonOperator(
    task_id = 'run_which_step',
    python_callable = which_step,
    retries = 3,
    retry_exponential_backoff = True,
    retry_delay = timedelta(seconds = 5)
)

with TaskGroup('say_goodbye') as say_goodbye:
    for i in range(0,2):
        step = BashOperator(
            task_id = 'step_' + str(i),
            retries = 3,
            bash_command = 'echo "goodbye world"'
            )

        step

finish_dag_step = BashOperator(
    task_id = 'finish_dag_step',
    retries = 3,
    bash_command = 'echo "dag is finished"'
)
say_hello >> run_which_step
run_which_step >> say_goodbye >> finish_dag_step
run_which_step >> finish_dag_step
finish_dag_step

I get the following errors when the dag hits run_which_step:

enter image description here enter image description here

I don't understand what's causing this. What is going on?

like image 866
fjjones88 Avatar asked Oct 17 '25 00:10

fjjones88


1 Answers

You can't create task dependencies to a TaskGroup. Therefore, you have to refer to the tasks by task_id, which is the TaskGroup's name and the task's id joined by a dot (task_group.task_id).

Your branching function should return something like

def branch():
    if condition:
        return [f'task_group.task_{i}' for i in range(0,2)]
    return 'default'

But instead of returning a list of task ids in such way, probably the easiest is to just put a DummyOperator upstream of the TaskGroup. It'd effectively act as an entrypoint to the whole group.

like image 145
EDG956 Avatar answered Oct 18 '25 15:10

EDG956