Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow: Dynamic SubDag creation

I have a use case where I have a list of clients. The client can be added or removed from the list, and they can have different start dates, and different initial parameters.

I want to use airflow to backfill all data for each client based on their initial start date + rerun if something fails. I am thinking about creating a SubDag for each client. Will this address my problem?

How can I dynamically create SubDags based on the client_id?

like image 497
Tuan Vu Avatar asked Aug 17 '17 23:08

Tuan Vu


1 Answers

You can definitely create DAG objects dynamically:

def make_client_dag(parent_dag, client):
  return DAG(
    '%s.client_%s' % (parent_dag.dag_id, client.name),
    start_date = client.start_date
  )

You could then use that method in a SubDagOperator from your main dag:

for client in clients:
  SubDagOperator(
    task_id='client_%s' % client.name,
    dag=main_dag,
    subdag = make_client_dag(main_dag, client)
  )

This will create a subdag specific to each member of the collection clients, and each will run for the next invocation of the main dag. I'm not sure if you'll get the backfill behavior you want.

like image 55
gcbenison Avatar answered Oct 04 '22 00:10

gcbenison