Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Access parent dag context at subtag creation time in airflow?

Tags:

airflow

I'm trying to access at subdag creation time some xcom data from parent dag, I was searching to achieve this on internet but I didn't find something.

def test(task_id):
    logging.info(f' execution of task {task_id}')


def load_subdag(parent_dag_id, child_dag_id, args):
    dag_subdag = DAG(
        dag_id='{0}.{1}'.format(parent_dag_id, child_dag_id),
        default_args=args,
        schedule_interval="@daily",
    )
    with dag_subdag:
        r = DummyOperator(task_id='random')

        for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
            t = PythonOperator(
                task_id='load_subdag_{0}'.format(i),
                default_args=args,
                python_callable=print_context,
                op_kwargs={'task_id': 'load_subdag_{0}'.format(i)},
                dag=dag_subdag,
            )

    return dag_subdag

load_tasks = SubDagOperator(
        task_id='load_tasks',
        subdag=load_subdag(dag.dag_id,
                           'load_tasks', args),
        default_args=args,
    )

got this error with my code

1  | Traceback (most recent call last):
airflow_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 374, in process_file
airflow_1  |     m = imp.load_source(mod_name, filepath)
airflow_1  |   File "/usr/local/lib/python3.6/imp.py", line 172, in load_source
airflow_1  |     module = _load(spec)
airflow_1  |   File "<frozen importlib._bootstrap>", line 684, in _load
airflow_1  |   File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
airflow_1  |   File "<frozen importlib._bootstrap_external>", line 678, in exec_module
airflow_1  |   File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
airflow_1  |   File "/app/dags/airflow_dag_test.py", line 75, in <module>
airflow_1  |     'load_tasks', args),
airflow_1  |   File "/app/dags/airflow_dag_test.py", line 55, in load_subdag
airflow_1  |     for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
airflow_1  | TypeError: xcom_pull() missing 1 required positional argument: 'context'
like image 593
kederrac Avatar asked Feb 18 '19 10:02

kederrac


People also ask

How do you find the context of Airflow?

When Airflow runs a task, it collects several variables and passes these to the context argument on the execute() method. These variables hold information about the current task, you can find the list here: https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html#default-variables.

What is upstream and downstream in Airflow?

Upstream task: A task that must reach a specified state before a dependent task can run. Downstream task: A dependent task that cannot run until an upstream task reaches a specified state.


1 Answers

The error is simple: you are missing the context argument required by xcom_pull() method. But you really can't just create context to pass into this method; it is a Python dictionary that Airflow passes to anchor methods like pre_execute() and execute() of BaseOperator (parent class of all Operators).

In other words, context becomes available only when Operator is actually executed, not during DAG-definition. And it makes sense because in taxanomy of Airflow, xcoms are communication mechanism between tasks in realtime: talking to each other while they are running.


But at the end of the day Xcoms, just like every other Airflow model, are persisted in backend meta-db. So of course you can directly retrieve it from there (obviously only the XCOMs of tasks that had run in the past). While I don't have a code-snippet, you can have a look at cli.py where they've used the SQLAlchemy ORM to play with models and backend-db. Do understand that this would mean a query being fired to your backend-db every time the DAG-definition file is parsed, which happens rather quickly.


Useful links

  • How can one set a variable for use only during a certain dag_run
  • How to pull xcom value from other task instance in the same DAG run (not the most recent one)?

EDIT-1

After looking at your code-snippet, I got alarmed. Assuming the value returned by xcom_pull() will keep changing frequently, the number of tasks in your dag will also keep changing. This can lead to unpredictable behaviours (you should do a fair bit of research but I don't have a good feeling about it)

I'd suggest you revisit your entire task workflow and condense down to a design where the

  • number of tasks and
  • structure of DAG are known ahead of time (at the time of execution of dag-definition file). You can of-course iterate over a json file / result of a SQL query (like the SQLAlchemy thing mentioned earlier) etc. to spawn your actual tasks, but that file / db / whatever shouldn't be changing frequently.

Do understand that merely iterating over a list to generate tasks is not problematic; what's NOT possible is to have structure of your DAG dependent on result of upstream task. For example you can't have n tasks created in your DAG based on an upstream task calculating value of n at runtime.


So this is not possible

  • Airflow dynamic tasks at runtime
  • Is there a way to create dynamic workflows in Airflow
  • Dynamically create list of tasks

But this is possible (including what you are trying to achieve; even though the way you are doing it doesn't seem like a good idea)

  • Dynamically Generating DAGs in Airflow
  • Airflow DAG dynamic structure
  • etsy/boundary-layer
  • ajbosco/dag-factory

EDIT-2

So as it turns out, generating tasks from output of upstream tasks is possible after all; although it requires significant amount of knowledge of internal workings of Airflow as well as a tinge of creativity.

  • In fact unless you really understand it, I would strongly recommend to stay away from it.
  • But for those who know no bounds here's the trick Proper way to create Dynamic Workflows in Airflow

EDIT-3

Airflow 2.3 added Dynamic Task Mapping. It can be used to iterate over a list and spin up a Task for each item.

like image 54
y2k-shubham Avatar answered Sep 18 '22 22:09

y2k-shubham