Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow: How to create child operators from inside python_callable in PythonOperator

Tags:

python

airflow

I have a simple python operator, defined like so:

loop_records = PythonOperator(
    task_id = 'loop_records',
    provide_context = True,
    python_callable = loop_topic_records,
    dag = dag    
)

This python operator calls loop_topic_records, defined like so:

def loop_topic_records(**context):
    parent_dag = context['dag']
    for i in range(3):
        op = DummyOperator(
            task_id="child_" + str(i),
            dag=parent_dag
        )
        logging.info('Child operator ' + str(i))
        loop_records >> op

I see that the code does not raise any errors. It even prints Child operator 0..2 in the log. However, in the dag Graph view I do not see child operators, I just see only loop_records node, as if my dag consists only of one operator. So, what is wrong with that? And how can I fix it?

like image 295
Jacobian Avatar asked Mar 02 '23 17:03

Jacobian


1 Answers

You can't do what you want. Each DAG, once loaded by Airflow, is static, and can't be altered from a running task. Any alterations you make to the DAG from inside a task are ignored.

What you can do, is start other DAGs, using the Multi DAG run operator provided by the airflow_multi_dagrun plugin; create a DAG of DAGs, so to speak:

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.operators.multi_dagrun import TriggerMultiDagRunOperator

def gen_topic_records(**context):
    for i in range(3):
        # generate `DagRunOrder` objects to pass a payload (configuration)
        # to the new DAG runs.
        yield DagRunOrder(payload={"child_id": i})
        logging.info('Triggering topic_record_dag #%d', i)

loop_topic_record_dags = TriggerMultiDagRunOperator(
    task_id='loop_topic_record_dags',
    dag=dag,
    trigger_dag_id='topic_record_dag',
    python_callable=gen_topic_records,
)

The above will trigger a DAG named topic_record_dag to be started, 3 times. Inside operators in that DAG, you can access whatever was set as the payload via the dag_run.conf object (in templates) or the context['dag_run'].conf reference (in PythonOperator() code, with provide_context=True set).

If you need to do additional work once those 3 DAGs are done, all you need is add a sensor to the above DAG. Sensors are operators that wait until a specific outside piece of information is available. Use one here that is triggered when all the child DAGs are done. The same plugin has a MultiDagRunSensor that's exactly what you'd need here, it'll trigger when all DAGs started by a TriggerMultiDagRunOperator task are finished (succeeded or failed):

from airflow import DAG
from airflow.operators.multi_dagrun import MultiDagRunSensor

wait_for_topic_record_dags = MultiDagRunSensor(
    task_id='wait_for_topic_record_dags',
    dag=dag
)

loop_topic_record_dags >> wait_for_topic_record_dags

then put further operators after that sensor.

like image 161
Martijn Pieters Avatar answered Apr 08 '23 14:04

Martijn Pieters