Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pulling xcom from sub dag

Tags:

python

airflow

I am using a main dag (main_dag) that contains a number of subdags and each of those subdags has a number of tasks. I pushed an xcom from subdagA taskA, but I am pulling that xcom within subdagB taskB. Since the dag_id argument in xcom_pull() defaults to self.dag_id I have been unable to pull the necessary xcom. I was wondering how one would do this and/or if there is a better way to set this scenario up so I don't have to deal with this.

example of what I am currently doing in subdagB:

def subdagB(parent_dag, child_dag, start_date, schedule_interval):
    subdagB = DAG('%s.%s' % (parent_dag, child_dag), start_date=start_date, schedule_interval=schedule_interval)
    start = DummyOperator(
        task_id='taskA',
        dag=subdagB)
    tag_db_template = '''echo {{ task_instance.xcom_pull(dag_id='dag.main_dag.subdagA', task_ids='taskA') }};'''
    t1 = BashOperator(
        task_id='taskB',
        bash_command=tag_db_template,
        xcom_push=True,
        dag=subdagB)
    end = DummyOperator(
        task_id='taskC',
        dag=subdagB)
    t0.set_upstream(start)
    t1.set_upstream(t0)
    end.set_upstream(t1)
    return subdagB

Thank you in advance for any help!

like image 572
amadorschulze92 Avatar asked Apr 19 '17 16:04

amadorschulze92


People also ask

What is the XCom in Airflow?

XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines. An XCom is identified by a key (essentially its name), as well as the task_id and dag_id it came from.

What is Subdag in Airflow?

Overview. SubDAGs were a legacy feature in Airflow that allowed users to implement reusable patterns of tasks in their DAGs.


1 Answers

You should be fine as long as you override the dag_id in

  • [Operator].xcom_pull(dag_id=dag_id, ...) or
  • [TaskInstance].xcom_pull(dag_id=dag_id, ...)

Just make sure that

dag_id = "{parent_dag_id}.{child_dag_id}"

If you can make your example more complete I can try running it locally, but I tested a (similar) example and cross-subdag xcoms work as expected.

like image 153
gnicholas Avatar answered Oct 03 '22 06:10

gnicholas