I have an airflow workflow that I'd like to modify (see illustration at the bottom).
However, I couldn't find a way to do that in the docs.
I've looked at subdags, branching and xcoms without luck.
There doesn't seem to be a way to specify how many tasks should run in parallel in a subdag based on a return from an operator.
To add to the problem, each task in the subdag receives a different parameter (an element from the list returned by the previous operator)
This is an illustration of what I'm trying to do :
I've run into this as well and haven't really found a clean way to address it. If you know all the different possible parameters you would pass to each subdag...then what you can do is hardcode that into the DAG file and just always create the DAG with every possible subdag. Then you have an operator (similar your "get every n") which fetches the list of subdags you want to run and have it mark any downstream subdag not in the list as skipped
. Something like this:
SUBDAGS = {
'a': {'id': 'foo'},
'b': {'id': 'bar'},
'c': {'id': 'test'},
'd': {'id': 'hi'},
}
def _select_subdags(**context):
names = fetch_list() # returns ["a", "c", "d"]
tasks_to_skip = ['my_subdag_' + name for name in set(SUBDAGS) - set(names)]
session = Session()
tis = session.query(TaskInstance).filter(
TaskInstance.dag_id == context['dag'].dag_id,
TaskInstance.execution_date == context['ti'].execution_date,
TaskInstance.task_id.in_(tasks_to_skip),
)
for ti in tis:
now = datetime.utcnow()
ti.state = State.SKIPPED
ti.start_date = now
ti.end_date = now
session.merge(ti)
session.commit()
session.close()
select_subdags = PythonOperator(
task_id='select_subdags',
dag=dag,
provide_context=True,
python_callable=_select_subdags,
)
for name, params in SUBDAGS.iteritems():
child_dag_id = 'my_subdag_' + name
subdag_op = SubDagOperator(
task_id=child_dag_id,
dag=dag,
subdag=my_subdag(dag.dag_id, child_dag_id, params),
)
select_subdags >> subdag_op
Obviously not ideal, especially when you end up wanting to just run one subdag out of hundreds. We've also run into some performance issues with thousands of subdags in a single DAG, as it can lead to tons of task instances, majority of which are simply skipped.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With