I am using a main dag (main_dag) that contains a number of subdags and each of those subdags has a number of tasks. I pushed an xcom from subdagA taskA, but I am pulling that xcom within subdagB taskB. Since the dag_id argument in xcom_pull() defaults to self.dag_id I have been unable to pull the necessary xcom. I was wondering how one would do this and/or if there is a better way to set this scenario up so I don't have to deal with this.
example of what I am currently doing in subdagB:
def subdagB(parent_dag, child_dag, start_date, schedule_interval):
subdagB = DAG('%s.%s' % (parent_dag, child_dag), start_date=start_date, schedule_interval=schedule_interval)
start = DummyOperator(
task_id='taskA',
dag=subdagB)
tag_db_template = '''echo {{ task_instance.xcom_pull(dag_id='dag.main_dag.subdagA', task_ids='taskA') }};'''
t1 = BashOperator(
task_id='taskB',
bash_command=tag_db_template,
xcom_push=True,
dag=subdagB)
end = DummyOperator(
task_id='taskC',
dag=subdagB)
t0.set_upstream(start)
t1.set_upstream(t0)
end.set_upstream(t1)
return subdagB
Thank you in advance for any help!
XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines. An XCom is identified by a key (essentially its name), as well as the task_id and dag_id it came from.
Overview. SubDAGs were a legacy feature in Airflow that allowed users to implement reusable patterns of tasks in their DAGs.
You should be fine as long as you override the dag_id
in
[Operator].xcom_pull(dag_id=dag_id, ...)
or[TaskInstance].xcom_pull(dag_id=dag_id, ...)
Just make sure that
dag_id = "{parent_dag_id}.{child_dag_id}"
If you can make your example more complete I can try running it locally, but I tested a (similar) example and cross-subdag xcoms work as expected.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With