Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to really create n tasks in a SubDAG based on the result of a previous task

Tags:

python

airflow

I'm creating a dynamic DAG in Airflow using SubDAGs. The thing I need is that the number of tasks inside the SubDAG is determined by the result of a previous task (the subtask_ids variable of the middle_section function should be the same variable of the initial_task function).

The thing is that I can't access xcom inside the subdag function of a SubDagOperator because I haven't any context. Also, I can't reach to any DB for reading some value because of the autodiscovery DAG feature of the scheduler: the middle_section is executed every few seconds.

How do you guys solve this? Create a dynamic number of tasks inside a SubDAG depending on the result of a previous task?

Here is the code I'm developing:

import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}


def initial_task(**context):
    subtask_ids = [0, 1, 2]
    task_instance = context['ti']
    task_instance.xcom_push(key='depot_ids', value=subtask_ids)


def middle_section_task(subtask_id):
    print(subtask_id)


def middle_section(parent_dag, arg):
    subdag = DAG(dag_id=f'{dag.dag_id}.middle',
                 default_args=args, schedule_interval='@once')

    subtask_ids = ''  # Read from xcom

    for subtask_id in subtask_ids:
        PythonOperator(task_id=f'{dag.dag_id}.middle_section_task_{subtask_id}',
                       python_callable=middle_section_task,
                       op_kwargs={'subtask_id': subtask_id}, dag=subdag)

    return subdag


def end_task(**context):
    print('Finished')


dag = DAG(dag_id='stackoverflow', default_args=args, schedule_interval=None)

initial = PythonOperator(task_id='start_task', python_callable=initial_task,
                         provide_context=True, dag=dag)

middle = SubDagOperator(task_id='middle', subdag=middle_section(dag, args),
                        default_args=args, dag=dag)

end = PythonOperator(task_id='end_task', python_callable=end_task,
                     provide_context=True, dag=dag)

initial >> middle >> end
like image 482
josemazo Avatar asked May 15 '18 16:05

josemazo


1 Answers

I had the same issue, I couldn't properly solve 100% the problem in an "Airflow way" since I think that the number of airflow tasks and subtasks is defined in the moment of the DAG validation. And at the validation no task is run, therefore there is no way that airflow knows beforehand how many subdag.tasks will be scheduled.

The way I circumvented this issue might not be the best (I'm open to suggestions) but it works:

main_dag.py

# imports omitted for brevity
def get_info_from_db():
    # get info from db or somewhere else, this info will define the number of subdag tasks to run
    return urls, names

dag = DAG(...)

urls, names = get_info_from_db()

# You may ignore the dummy operators
start = DummyOperator(task_id='start', default_args=args, dag=dag)
sub_section = SubDagOperator(
    task_id='import-file',
    subdag=imported_subdag(DAG_NAME, 'subdag-name', args, urls=urls, file_names=names),
    default_args=args,
    dag=dag,
)
end = DummyOperator(task_id='end', default_args=args, dag=dag)

start.set_downstream(sub_section)
section_1.set_downstream(end)

Then finally I have my subdag.py (Make sure it is discoverable from airflow) in case it is in a separate file

# imports omitted for brevity
def fetch_files(file_url, file_name):
    # get file and save it to disk
    return file_location

# this is how I get info returned from the previous task: fetch_files
def validate_file(task_id, **kwargs):
    ti = kwargs['ti']
    task = 'fetch_file-{}'.format(task_id)
    file_location = ti.xcom_pull(task_ids=task)

def imported_subdag(parent_dag_name, child_dag_name, args, urls, file_names):
    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=args,
        schedule_interval="@daily",
    )
    for i in range(len(urls)):
        # the task name should also be dynamic in order not to have duplicates
        validate_file_operator = PythonOperator(task_id='validate_file-{}'.format(i+1),
                                                python_callable=validate_file,
                                                provide_context=True, dag=dag_subdag, op_kwargs={'task_id': i + 1})
        fetch_operator = PythonOperator(task_id='fetch_file-{}'.format(i+1),
                                        python_callable=fetch_zip, dag=dag_subdag,
                                        op_kwargs={'file_url': urls[i], 'file_name': file_names[i]})
        fetch_operator.set_downstream(validate_file_operator)
    return dag_subdag

Basically my logic is that in the moment of the validation by Airflow get_info_from_db() gets executed and all dags and subdags are properly scheduled dynamically. If I add or remove content from the db, the number of tasks to be run will be updated in the next dag validation.

This approach suited my use case, but I hope in the future Airflow supports this feature (dynamic number of tasks/subdag.tasks) natively.

like image 191
Hito Avatar answered Sep 28 '22 19:09

Hito