Access parent dag context at subtag creation time in airflow?



I'm trying to access at subdag creation time some xcom data from parent dag, I was searching to achieve this on internet but I didn't find something.

def test(task_id):
    logging.info(f' execution of task {task_id}')

def load_subdag(parent_dag_id, child_dag_id, args):
    dag_subdag = DAG(
        dag_id='{0}.{1}'.format(parent_dag_id, child_dag_id),
    with dag_subdag:
        r = DummyOperator(task_id='random')

        for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
            t = PythonOperator(
                op_kwargs={'task_id': 'load_subdag_{0}'.format(i)},

    return dag_subdag

load_tasks = SubDagOperator(
                           'load_tasks', args),

got this error with my code

1  | Traceback (most recent call last):
airflow_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 374, in process_file
airflow_1  |     m = imp.load_source(mod_name, filepath)
airflow_1  |   File "/usr/local/lib/python3.6/imp.py", line 172, in load_source
airflow_1  |     module = _load(spec)
airflow_1  |   File "<frozen importlib._bootstrap>", line 684, in _load
airflow_1  |   File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
airflow_1  |   File "<frozen importlib._bootstrap_external>", line 678, in exec_module
airflow_1  |   File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
airflow_1  |   File "/app/dags/airflow_dag_test.py", line 75, in <module>
airflow_1  |     'load_tasks', args),
airflow_1  |   File "/app/dags/airflow_dag_test.py", line 55, in load_subdag
airflow_1  |     for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
airflow_1  | TypeError: xcom_pull() missing 1 required positional argument: 'context'
How do you find the context of Airflow?

When Airflow runs a task, it collects several variables and passes these to the context argument on the execute() method. These variables hold information about the current task, you can find the list here: https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html#default-variables.

What is upstream and downstream in Airflow?

Upstream task: A task that must reach a specified state before a dependent task can run. Downstream task: A dependent task that cannot run until an upstream task reaches a specified state.

1 Answers

The error is simple: you are missing the context argument required by xcom_pull() method. But you really can't just create context to pass into this method; it is a Python dictionary that Airflow passes to anchor methods like pre_execute() and execute() of BaseOperator (parent class of all Operators).

In other words, context becomes available only when Operator is actually executed, not during DAG-definition. And it makes sense because in taxanomy of Airflow, xcoms are communication mechanism between tasks in realtime: talking to each other while they are running.

But at the end of the day Xcoms, just like every other Airflow model, are persisted in backend meta-db. So of course you can directly retrieve it from there (obviously only the XCOMs of tasks that had run in the past). While I don't have a code-snippet, you can have a look at cli.py where they've used the SQLAlchemy ORM to play with models and backend-db. Do understand that this would mean a query being fired to your backend-db every time the DAG-definition file is parsed, which happens rather quickly.

Useful links

  • How can one set a variable for use only during a certain dag_run
  • How to pull xcom value from other task instance in the same DAG run (not the most recent one)?


After looking at your code-snippet, I got alarmed. Assuming the value returned by xcom_pull() will keep changing frequently, the number of tasks in your dag will also keep changing. This can lead to unpredictable behaviours (you should do a fair bit of research but I don't have a good feeling about it)

I'd suggest you revisit your entire task workflow and condense down to a design where the

  • number of tasks and
  • structure of DAG are known ahead of time (at the time of execution of dag-definition file). You can of-course iterate over a json file / result of a SQL query (like the SQLAlchemy thing mentioned earlier) etc. to spawn your actual tasks, but that file / db / whatever shouldn't be changing frequently.

Do understand that merely iterating over a list to generate tasks is not problematic; what's NOT possible is to have structure of your DAG dependent on result of upstream task. For example you can't have n tasks created in your DAG based on an upstream task calculating value of n at runtime.

So this is not possible

  • Airflow dynamic tasks at runtime
  • Is there a way to create dynamic workflows in Airflow
  • Dynamically create list of tasks

But this is possible (including what you are trying to achieve; even though the way you are doing it doesn't seem like a good idea)

  • Dynamically Generating DAGs in Airflow
  • Airflow DAG dynamic structure
  • etsy/boundary-layer
  • ajbosco/dag-factory


So as it turns out, generating tasks from output of upstream tasks is possible after all; although it requires significant amount of knowledge of internal workings of Airflow as well as a tinge of creativity.

  • In fact unless you really understand it, I would strongly recommend to stay away from it.
  • But for those who know no bounds here's the trick Proper way to create Dynamic Workflows in Airflow


Airflow 2.3 added Dynamic Task Mapping. It can be used to iterate over a list and spin up a Task for each item.

