Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Launch a subdag with variable parallel tasks in airflow

I have an airflow workflow that I'd like to modify (see illustration at the bottom).
However, I couldn't find a way to do that in the docs.

I've looked at subdags, branching and xcoms without luck.

There doesn't seem to be a way to specify how many tasks should run in parallel in a subdag based on a return from an operator.
To add to the problem, each task in the subdag receives a different parameter (an element from the list returned by the previous operator)

This is an illustration of what I'm trying to do : airflow description

like image 443
PhilipGarnero Avatar asked Oct 18 '22 00:10

PhilipGarnero


1 Answers

I've run into this as well and haven't really found a clean way to address it. If you know all the different possible parameters you would pass to each subdag...then what you can do is hardcode that into the DAG file and just always create the DAG with every possible subdag. Then you have an operator (similar your "get every n") which fetches the list of subdags you want to run and have it mark any downstream subdag not in the list as skipped. Something like this:

SUBDAGS = {
    'a': {'id': 'foo'},
    'b': {'id': 'bar'},
    'c': {'id': 'test'},
    'd': {'id': 'hi'},
}   

def _select_subdags(**context):
    names = fetch_list()  # returns ["a", "c", "d"]
    tasks_to_skip = ['my_subdag_' + name for name in set(SUBDAGS) - set(names)]

    session = Session()
    tis = session.query(TaskInstance).filter(
        TaskInstance.dag_id == context['dag'].dag_id, 
        TaskInstance.execution_date == context['ti'].execution_date,
        TaskInstance.task_id.in_(tasks_to_skip),
    )
    for ti in tis:
        now = datetime.utcnow()
        ti.state = State.SKIPPED
        ti.start_date = now
        ti.end_date = now
        session.merge(ti)
    session.commit()
    session.close()

select_subdags = PythonOperator(
    task_id='select_subdags',
    dag=dag,
    provide_context=True,
    python_callable=_select_subdags,
)

for name, params in SUBDAGS.iteritems():
    child_dag_id = 'my_subdag_' + name
    subdag_op = SubDagOperator(
        task_id=child_dag_id,
        dag=dag,
        subdag=my_subdag(dag.dag_id, child_dag_id, params),
    )
    select_subdags >> subdag_op

Obviously not ideal, especially when you end up wanting to just run one subdag out of hundreds. We've also run into some performance issues with thousands of subdags in a single DAG, as it can lead to tons of task instances, majority of which are simply skipped.

like image 183
Daniel Huang Avatar answered Oct 19 '22 23:10

Daniel Huang