I have a main dag which retrieves a file and splits the data in this file to separate csv files. I have another set of tasks that must be done for each file of these csv files. eg (Uploading to GCS, Inserting to BigQuery) How can I generate a SubDag for each file dynamically based on the number of files? SubDag will define the tasks like Uploading to GCS, Inserting to BigQuery, deleting the csv file)
So right now, this is what it looks like
main_dag = DAG(....)
download_operator = SFTPOperator(dag = main_dag, ...) # downloads file
transform_operator = PythonOperator(dag = main_dag, ...) # Splits data and writes csv files
def subdag_factory(): # Will return a subdag with tasks for uploading to GCS, inserting to BigQuery.
...
...
How can I call the subdag_factory for each file generated in transform_operator?
Airflow's dynamic task mapping feature is built off of the MapReduce programming model. The map procedure takes a set of inputs and creates a single task for each one. The reduce procedure, which is optional, allows a task to operate on the collected output of a mapped task.
Dynamic DAGs with globals() You can dynamically generate DAGs by working with globals() . As long as a DAG object in globals() is created, Airflow will load it.
Because everything in Airflow is code, you can dynamically generate DAGs using Python alone. As long as a DAG object in globals() is created by Python code that lives in the dags_folder , Airflow will load it. In this guide, we'll cover a few of the many ways you can generate DAGs.
I tried creating subdag
s dynamically as follows
# create and return and DAG
def create_subdag(dag_parent, dag_id_child_prefix, db_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix + db_name)
default_args_copy = default_args.copy()
# dag
dag = DAG(dag_id=dag_id_child,
default_args=default_args_copy,
schedule_interval='@once')
# operators
tid_check = 'check2_db_' + db_name
py_op_check = PythonOperator(task_id=tid_check, dag=dag,
python_callable=check_sync_enabled,
op_args=[db_name])
tid_spark = 'spark2_submit_' + db_name
py_op_spark = PythonOperator(task_id=tid_spark, dag=dag,
python_callable=spark_submit,
op_args=[db_name])
py_op_check >> py_op_spark
return dag
# wrap DAG into SubDagOperator
def create_subdag_operator(dag_parent, db_name):
tid_subdag = 'subdag_' + db_name
subdag = create_subdag(dag_parent, tid_prefix_subdag, db_name)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op
# create SubDagOperator for each db in db_names
def create_all_subdag_operators(dag_parent, db_names):
subdags = [create_subdag_operator(dag_parent, db_name) for db_name in db_names]
# chain subdag-operators together
airflow.utils.helpers.chain(*subdags)
return subdags
# (top-level) DAG & operators
dag = DAG(dag_id=dag_id_parent,
default_args=default_args,
schedule_interval=None)
subdag_ops = create_subdag_operators(dag, db_names)
Note that the list of inputs for which subdag
s are created, here db_names
, can either be declared statically in the python
file or could be read from external source.
The resulting DAG
looks like this
Diving into SubDAG
(s)
Airflow deals with DAG in two different ways.
One way is when you define your dynamic DAG in one python file and put it into dags_folder
. And it generates dynamic DAG based on external source (config files in other dir, SQL, noSQL, etc). Less changes to the structure of the DAG - better (actually just true for all situations). For instance, our DAG file generates dags for every record(or file), it generates dag_id as well. Every airflow scheduler's heartbeat this code goes through the list and generates the corresponding DAG. Pros :) not too much, just one code file to change. Cons a lot and it goes to the way Airflow works. For every new DAG(dag_id) airflow writes steps into database so when number of steps changes or name of the step it might break the web server. When you delete a DAG from your list it became kind of orphanage you can't access it from web interface and have no control over a DAG you can't see the steps, you can't restart and so on. If you have a static list of DAGs and IDes are not going to change but steps occasionally do this method is acceptable.
So at some point I've come up with another solution. You have static DAGs (they are still dynamic the script generates them, but their structure, IDes do not change). So instead of one script that walks trough the list like in directory and generates DAGs. You do two static DAGs, one monitors the directory periodically (*/10 ****), the other one is triggered by the first. So when a new file/files appeared, the first DAG triggers the second one with arg conf. Next code has to be executed for every file in the directory.
session = settings.Session() dr = DagRun( dag_id=dag_to_be_triggered, run_id=uuid_run_id, conf={'file_path': path_to_the_file}, execution_date=datetime.now(), start_date=datetime.now(), external_trigger=True) logging.info("Creating DagRun {}".format(dr)) session.add(dr) session.commit() session.close()
The triggered DAG can receive the conf arg and finish all the required tasks for the particular file. To access the conf param use this:
def work_with_the_file(**context): path_to_file = context['dag_run'].conf['file_path'] \ if 'file_path' in context['dag_run'].conf else None if not path_to_file: raise Exception('path_to_file must be provided')
Pros all the flexibility and functionality of Airflow
Cons the monitor DAG can be spammy.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With