Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Return value from one Airflow DAG into another one

Tags:

python

airflow

My DAG (let's call it DAG_A) starting another DAG (DAG_B) using the trigger_dagrun operator. DAG_B's tasks use XCOM and I would like to obtain XCOM value from one of the tasks of DAG_B's run (exactly the one I've started) upon completion.

Use of XCOM is not a hard requirement - basically any (reasonable) mechanism that Airflow itself provides would work. I can change DAG_B if needed.

Can't find any examples of such cases, so appreciate the help.

Plan B would be to make DAG_B save XCOM values into some persistent storage like DB or file together with some run id, and DAG_A will take it from there. But I would like to avoid such complications if some built-in mechanisms were available.

like image 871
mrzodiak Avatar asked Apr 28 '21 11:04

mrzodiak


1 Answers

You can pull XCOM values from another dag, by passing in the dag_id to xcom_pull() (see the task_instance.xcom_pull() function documentation). This works as long as you triggered the subdag using the same execution date as your current DAG. That's trivially achieved by templating the execution_date value:

trigger = TriggerDagRunOperator(
    task_id="trigger_dag_b",
    trigger_dag_id="DAG_B",
    execution_date="{{ execution_date }}",
    ...
)

Then, provided you used an ExternalTaskSensor sensor to wait for the specific task to have completed or used wait_for_completion=True in your TriggerDagRunOperator() task, you can later on pull the XCOM with task_instance.xcom_pull(dag_id="DAG_B", ...) (add in task ids and/or the XCOM key you want to pull).

If you are not averse to coding a Python operator, you can also import the XCom model and just use its XCom.get_one() method directly:

value = XCom.get_one(
    execution_date=ti.execution_date,
    key="target key",
    task_id="some.task.id",
    dag_id="DAG_B",
)

I've used similar techniques using a multi-dagrun trigger (to process a variable number of resources); this is trickier as in that case you can't re-use the execution date (each dagrun must have a unique (dag_id, execution_date) tuple).

In those cases I either used direct queries (joining the SQLAlchemy XCom model against the DagRun model using dagrun ids stored in an XCom by the trigger, instead of relying on the execution date matching), or avoided the whole issue by configuring the subdags up front. The latter is achieved by setting up the sub-dag with configuration telling it where to output results that the parent dag then picks up. The documentation doesn't appear to mention this properly, but the conf argument to TriggerDagRun() supports templating too, so you can generate a dictionary there as input to the sub-dag, tasks in the sub-dag then reference the configuration via params.

like image 60
Martijn Pieters Avatar answered Oct 30 '22 19:10

Martijn Pieters