I have a simple python operator, defined like so:
loop_records = PythonOperator(
task_id = 'loop_records',
provide_context = True,
python_callable = loop_topic_records,
dag = dag
)
This python operator calls loop_topic_records
, defined like so:
def loop_topic_records(**context):
parent_dag = context['dag']
for i in range(3):
op = DummyOperator(
task_id="child_" + str(i),
dag=parent_dag
)
logging.info('Child operator ' + str(i))
loop_records >> op
I see that the code does not raise any errors. It even prints Child operator 0..2
in the log. However, in the dag Graph view
I do not see child operators, I just see only loop_records
node, as if my dag consists only of one operator. So, what is wrong with that? And how can I fix it?
You can't do what you want. Each DAG, once loaded by Airflow, is static, and can't be altered from a running task. Any alterations you make to the DAG from inside a task are ignored.
What you can do, is start other DAGs, using the Multi DAG run operator provided by the airflow_multi_dagrun
plugin; create a DAG of DAGs, so to speak:
from airflow.operators.dagrun_operator import DagRunOrder
from airflow.operators.multi_dagrun import TriggerMultiDagRunOperator
def gen_topic_records(**context):
for i in range(3):
# generate `DagRunOrder` objects to pass a payload (configuration)
# to the new DAG runs.
yield DagRunOrder(payload={"child_id": i})
logging.info('Triggering topic_record_dag #%d', i)
loop_topic_record_dags = TriggerMultiDagRunOperator(
task_id='loop_topic_record_dags',
dag=dag,
trigger_dag_id='topic_record_dag',
python_callable=gen_topic_records,
)
The above will trigger a DAG named topic_record_dag
to be started, 3 times. Inside operators in that DAG, you can access whatever was set as the payload via the dag_run.conf
object (in templates) or the context['dag_run'].conf
reference (in PythonOperator()
code, with provide_context=True
set).
If you need to do additional work once those 3 DAGs are done, all you need is add a sensor to the above DAG. Sensors are operators that wait until a specific outside piece of information is available. Use one here that is triggered when all the child DAGs are done. The same plugin has a MultiDagRunSensor
that's exactly what you'd need here, it'll trigger when all DAGs started by a TriggerMultiDagRunOperator
task are finished (succeeded or failed):
from airflow import DAG
from airflow.operators.multi_dagrun import MultiDagRunSensor
wait_for_topic_record_dags = MultiDagRunSensor(
task_id='wait_for_topic_record_dags',
dag=dag
)
loop_topic_record_dags >> wait_for_topic_record_dags
then put further operators after that sensor.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With