Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to generate multiple airflow dags through a single script?

I want to generate multiple airflow dags using one script. The dag names should be "test_parameter". Below is my script:

from datetime import datetime

# Importing Airflow modules
from airflow.models import DAG
from airflow.operators import DummyOperator

# Specifying the default arguments for the DAG
default_args = {
    'owner': 'Test',
    'start_date': datetime.now()
    }

parameter_list = ["abc", "pqr", "xyz"]

for parameter in parameter_list:
    dag = DAG("test_"+parameter,
              default_args=default_args,
              schedule_interval=None)
    dag.doc_md = "This is a test dag"

    # Creating Start Dummy Operator
    start = DummyOperator(
        task_id="start",
        dag=dag)

    # Creating End Dummy Operator
    end = DummyOperator(
        task_id="end",
        dag=dag)

    # Design workflow of tasks in the dag
    end.set_upstream(start)

So in this case, it should create 3 dags: "test_abc", "test_pqr" and "test_xyz".

But on running the script, it creates only one dag "test_xyz". Any insights on how to solve this issue. Thanks in advance :)

like image 269
Vatsal Gosaliya Avatar asked Mar 16 '18 19:03

Vatsal Gosaliya


People also ask

How to create a DAG in airflow?

The first step is to import the classes you need. To create a DAG in Airflow, you always have to import the DAG class. After the DAG class, come the imports of Operators. Basically, for each Operator you want to use, you have to make the corresponding import.

What is a Dagrun in airflow?

A DAGRun is an instance of your DAG with an execution date in Airflow. Ok, once you know what is a DAG, the next question is, what is a “Node” in the context of Airflow? What is Airflow Operator? In an Airflow DAG, nodes are operators. In other words, a task in your DAG is an operator.

What is the difference between dagfactory and airflow?

DAGFactory: DAGFactory will collect and initialize DAGs from the individual projects under a specific folder, in my case it was @ airflow/projects. Airflow will initialize all the dags under airflow/dags dir. So we will install DAGFactory here.

How to import operators in airflow?

The first step is to import the classes you need. To create a DAG in Airflow, you always have to import the DAG class. After the DAG class, come the imports of Operators. Basically, for each Operator you want to use, you have to make the corresponding import. For example, you want to execute a Python function, you have to import the PythonOperator.


2 Answers

Yes, it's possible, you can save your config for each DAG namely inside a storage. For example you can save your configuration within a persistent storage (DB) and then fetch the configuration and save the result inside a cache. This was done mainly because we want to prevent the dag script fetching the configuration from DB each time the DAG script refreshed. So instead, we use a cache and save its expire time. You can refer to this article on how to create a dynamic DAG

for i in range(10):
  dag_id = 'foo_{}'.format(i)
  globals()[dag_id] = DAG(dag_id)

In turn you also want to create a dynamic sub-DAG and dynamic tasks as well. Hope it helps :-)

like image 120
irvifa Avatar answered Nov 14 '22 23:11

irvifa


I guess the problem is that the dag objects 'start' and 'end' get overwrote by the forloop hence only the last value is retained.

It is weird that although you cant create dag dynamically, but you can create tasks dynamically through a loop. maybe that helps.

for i in range(3):
    t1 = BashOperator(
    task_id='Success_test'+str(i),
    bash_command='cd home',
    dag=dag)

    slack_notification.set_upstream(t1)
like image 41
wang892 Avatar answered Nov 14 '22 21:11

wang892