Suppose a following situation:
[c1, c2, c3] >> child_task
where all c1
, c2
, c3
and child_task
are operators and have task_id
equal to id1
, id2
, id3
and child_id
respectively.
Task child_task
is also a PythonOperator
with provide_context=True
and python_callable=dummy_func
def dummy_func(**context):
#...
Is it possible to retrieve all parents' ids inside the dummy_func
(perhaps by browsing the dag somehow using the context)?
Expected result in this case would be a list ['id1', 'id2', 'id3']
.
The upstream_task_ids
and downstream_task_ids
properties of BaseOperator
are meant just for this purpose.
from typing import List
..
parent_task_ids: List[str] = my_task.upstream_task_ids
child_task_ids: List[str] = my_task_downstream_task_ids
Do note however that with this property
, you only get immediate (upstream / downstream) neighbour(s) of a task. In order to get all ancestor or descendent task
s, you can quickly cook-up the good old graph theory approach such as this BFS
-like implementation
from typing import List, Set
from queue import Queue
from airflow.models import BaseOperator
def get_ancestor_tasks(my_task: BaseOperator) -> List[BaseOperator]:
ancestor_task_ids: Set[str] = set()
tasks_queue: Queue = Queue()
# determine parent tasks to begin BFS
for task in my_task.upstream_list:
tasks_queue.put(item=task)
# perform BFS
while not tasks_queue.empty():
task: BaseOperator = tasks_queue.get()
ancestor_task_ids.add(element=task.task_id)
for _task in task.upstream_list:
tasks_queue.put(item=_task)
# Convert task_ids to actual tasks
ancestor_tasks: List[BaseOperator] = [task for task in my_task.dag.tasks if task.task_id in ancestor_task_ids]
return ancestor_tasks
Above snippet is NOT tested, but I'm sure you can take inspiration from it
References
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With