Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How exactly does the subDAG work in Airflow? What does it mean for a subDAG to be enabled?

Tags:

I have looked at the Airflow subDAG section and tried to find anything else online that would be helpful, however I have not found anything that explained in detail how to make a subDAG work. One of the requirements for a subDAG to run is that it should be enabled. How do you enable/disable a subdag?

I wrote some sample code that doesn't show any errors in airflow, however when I try to run it, none of the operators in the subDAG get executed.

This is my main dag code:

import os from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta from airflow.operators.subdag_operator import SubDagOperator from linecount_subdag import sub_dag  parent_dag_name = 'example_linecount_dag' child_dag_name = 'example_linecount_subdag'  args = {     'owner': 'airflow',     'start_date': datetime(2016, 04, 20),     'retries': 0, } main_dag = DAG(     dag_id=parent_dag_name,     default_args=args,     schedule_interval=timedelta(minutes=5),     start_date=datetime(2016, 04, 20),     max_active_runs=1 )  subdag = SubDagOperator(     subdag=sub_dag(parent_dag_name, child_dag_name, args, main_dag.schedule_interval),     task_id=child_dag_name,     default_args=args,     dag=main_dag) t = BashOperator(     task_id='start',     bash_command='echo "waiting for subdag..."',     default_args=args,     dag=main_dag) t.set_downstream(subdag) 

In this code, the task 'start' succeeds, however the subdag task doesn't do anything and neither fails nor succeeds.

Here is my subDAG code:

from airflow.models import DAG from airflow.operators import BashOperator  # Dag is returned by a factory method def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval):   dag = DAG(     '%s.%s' % (parent_dag_name, child_dag_name),     default_args=args,     start_date=args['start_date'],     max_active_runs=1,   )   t1 = BashOperator(     task_id='count_lines',     bash_command='cat /root/airflow/airflow.cfg | wc -l',     default_args=args,     xcom_push=True,     dag=dag)   t2 = BashOperator(     task_id='retrieve_val',     bash_command='grep "airflow_home" /root/airflow/airflow.cfg',     default_args=args,     xcom_push=True,     dag=dag)   templated_command = """     {         echo "{{ ti.xcom_pull(task_ids='count_lines') }}"         echo "{{ ti.xcom_pull(task_ids='retrieve_val') }}"     }"""   t3 = BashOperator(     task_id='print_values',     bash_command=templated_command,     default_args=args,     dag=dag)   t3.set_upstream(t1)   t3.set_upstream(t2)   return dag 

The 3 operators in this code get the number of lines of the file "airflow.cfg", find the value of "airflow_home" in that file, and return both of those values to be printed. This code works on its own, so I don't think it's the problem.

What do I have to change to make the subDAG execute its operators?

like image 660
Nikita Semichev Avatar asked Jul 11 '16 15:07

Nikita Semichev


People also ask

What is Subdag in Airflow?

SubDAGs were a legacy feature in Airflow that allowed users to implement reusable patterns of tasks in their DAGs. SubDAGs caused performance and functional issues for many users, and they have been deprecated as of Airflow 2.0 and will be removed entirely in a future release.

How does the Airflow work?

Airflow is a platform to programmatically author, schedule and monitor workflows. Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies.

What is the parent DAG in Airflow?

In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.


1 Answers

I used your code locally and it works fine.

The only things I changed, were setting both the outer dag, and sub dag to have schedule_interval=None and triggered them manually.

Having a start date of datetime(2016, 04, 20) and schedule_interval of 5 minutes will flood the airflow scheduler with many backfill requests.

You might need to switch from using a LocalExecutor to CeleryExecutor. LocalExecutor is fairly limited.

Here is the output from the last step in the subdag:

[2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask:       { [2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask:           echo "226" [2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask:           echo "airflow_home = /root/airflow/" [2017-03-08 15:35:18,994] {base_task_runner.py:95} INFO - Subtask:       } 
like image 82
jhnclvr Avatar answered Sep 25 '22 13:09

jhnclvr