I want to generate multiple airflow dags using one script. The dag names should be "test_parameter". Below is my script:
from datetime import datetime
# Importing Airflow modules
from airflow.models import DAG
from airflow.operators import DummyOperator
# Specifying the default arguments for the DAG
default_args = {
'owner': 'Test',
'start_date': datetime.now()
}
parameter_list = ["abc", "pqr", "xyz"]
for parameter in parameter_list:
dag = DAG("test_"+parameter,
default_args=default_args,
schedule_interval=None)
dag.doc_md = "This is a test dag"
# Creating Start Dummy Operator
start = DummyOperator(
task_id="start",
dag=dag)
# Creating End Dummy Operator
end = DummyOperator(
task_id="end",
dag=dag)
# Design workflow of tasks in the dag
end.set_upstream(start)
So in this case, it should create 3 dags: "test_abc", "test_pqr" and "test_xyz".
But on running the script, it creates only one dag "test_xyz". Any insights on how to solve this issue. Thanks in advance :)
The first step is to import the classes you need. To create a DAG in Airflow, you always have to import the DAG class. After the DAG class, come the imports of Operators. Basically, for each Operator you want to use, you have to make the corresponding import.
A DAGRun is an instance of your DAG with an execution date in Airflow. Ok, once you know what is a DAG, the next question is, what is a “Node” in the context of Airflow? What is Airflow Operator? In an Airflow DAG, nodes are operators. In other words, a task in your DAG is an operator.
DAGFactory: DAGFactory will collect and initialize DAGs from the individual projects under a specific folder, in my case it was @ airflow/projects. Airflow will initialize all the dags under airflow/dags dir. So we will install DAGFactory here.
The first step is to import the classes you need. To create a DAG in Airflow, you always have to import the DAG class. After the DAG class, come the imports of Operators. Basically, for each Operator you want to use, you have to make the corresponding import. For example, you want to execute a Python function, you have to import the PythonOperator.
Yes, it's possible, you can save your config for each DAG namely inside a storage. For example you can save your configuration within a persistent storage (DB) and then fetch the configuration and save the result inside a cache. This was done mainly because we want to prevent the dag script fetching the configuration from DB each time the DAG script refreshed. So instead, we use a cache and save its expire time. You can refer to this article on how to create a dynamic DAG
for i in range(10):
dag_id = 'foo_{}'.format(i)
globals()[dag_id] = DAG(dag_id)
In turn you also want to create a dynamic sub-DAG and dynamic tasks as well. Hope it helps :-)
I guess the problem is that the dag objects 'start' and 'end' get overwrote by the forloop hence only the last value is retained.
It is weird that although you cant create dag dynamically, but you can create tasks dynamically through a loop. maybe that helps.
for i in range(3):
t1 = BashOperator(
task_id='Success_test'+str(i),
bash_command='cd home',
dag=dag)
slack_notification.set_upstream(t1)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With