I'm creating a dynamic DAG in Airflow using SubDAGs. The thing I need is that the number of tasks inside the SubDAG is determined by the result of a previous task (the subtask_ids
variable of the middle_section
function should be the same variable of the initial_task
function).
The thing is that I can't access xcom
inside the subdag function of a SubDagOperator
because I haven't any context. Also, I can't reach to any DB for reading some value because of the autodiscovery DAG feature of the scheduler: the middle_section
is executed every few seconds.
How do you guys solve this? Create a dynamic number of tasks inside a SubDAG depending on the result of a previous task?
Here is the code I'm developing:
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
def initial_task(**context):
subtask_ids = [0, 1, 2]
task_instance = context['ti']
task_instance.xcom_push(key='depot_ids', value=subtask_ids)
def middle_section_task(subtask_id):
print(subtask_id)
def middle_section(parent_dag, arg):
subdag = DAG(dag_id=f'{dag.dag_id}.middle',
default_args=args, schedule_interval='@once')
subtask_ids = '' # Read from xcom
for subtask_id in subtask_ids:
PythonOperator(task_id=f'{dag.dag_id}.middle_section_task_{subtask_id}',
python_callable=middle_section_task,
op_kwargs={'subtask_id': subtask_id}, dag=subdag)
return subdag
def end_task(**context):
print('Finished')
dag = DAG(dag_id='stackoverflow', default_args=args, schedule_interval=None)
initial = PythonOperator(task_id='start_task', python_callable=initial_task,
provide_context=True, dag=dag)
middle = SubDagOperator(task_id='middle', subdag=middle_section(dag, args),
default_args=args, dag=dag)
end = PythonOperator(task_id='end_task', python_callable=end_task,
provide_context=True, dag=dag)
initial >> middle >> end
I had the same issue, I couldn't properly solve 100% the problem in an "Airflow way" since I think that the number of airflow tasks and subtasks is defined in the moment of the DAG validation. And at the validation no task is run, therefore there is no way that airflow knows beforehand how many subdag.tasks will be scheduled.
The way I circumvented this issue might not be the best (I'm open to suggestions) but it works:
main_dag.py
# imports omitted for brevity
def get_info_from_db():
# get info from db or somewhere else, this info will define the number of subdag tasks to run
return urls, names
dag = DAG(...)
urls, names = get_info_from_db()
# You may ignore the dummy operators
start = DummyOperator(task_id='start', default_args=args, dag=dag)
sub_section = SubDagOperator(
task_id='import-file',
subdag=imported_subdag(DAG_NAME, 'subdag-name', args, urls=urls, file_names=names),
default_args=args,
dag=dag,
)
end = DummyOperator(task_id='end', default_args=args, dag=dag)
start.set_downstream(sub_section)
section_1.set_downstream(end)
Then finally I have my subdag.py (Make sure it is discoverable from airflow) in case it is in a separate file
# imports omitted for brevity
def fetch_files(file_url, file_name):
# get file and save it to disk
return file_location
# this is how I get info returned from the previous task: fetch_files
def validate_file(task_id, **kwargs):
ti = kwargs['ti']
task = 'fetch_file-{}'.format(task_id)
file_location = ti.xcom_pull(task_ids=task)
def imported_subdag(parent_dag_name, child_dag_name, args, urls, file_names):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval="@daily",
)
for i in range(len(urls)):
# the task name should also be dynamic in order not to have duplicates
validate_file_operator = PythonOperator(task_id='validate_file-{}'.format(i+1),
python_callable=validate_file,
provide_context=True, dag=dag_subdag, op_kwargs={'task_id': i + 1})
fetch_operator = PythonOperator(task_id='fetch_file-{}'.format(i+1),
python_callable=fetch_zip, dag=dag_subdag,
op_kwargs={'file_url': urls[i], 'file_name': file_names[i]})
fetch_operator.set_downstream(validate_file_operator)
return dag_subdag
Basically my logic is that in the moment of the validation by Airflow get_info_from_db()
gets executed and all dags and subdags are properly scheduled dynamically. If I add or remove content from the db, the number of tasks to be run will be updated in the next dag validation.
This approach suited my use case, but I hope in the future Airflow supports this feature (dynamic number of tasks/subdag.tasks) natively.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With