Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Airflow DAG in functions?

I am working in $AIRFLOW_HOME/dags. I have created the following files:

- common
  |- __init__.py   # empty
  |- common.py     # common code
- foo_v1.py        # dag instanciation

In common.py:

default_args = ...

def create_dag(project, version):
  dag_id = project + '_' + version
  dag = DAG(dag_id, default_args=default_args, schedule_interval='*/10 * * * *', catchup=False)
  print('creating DAG ' + dag_id)

  t1 = BashOperator(

  t2 = BashOperator(
    bash_command='sleep 5',


In foo_v1.py:

 from common.common import create_dag

 create_dag('foo', 'v1')

When testing the script with python, it looks OK:

 $ python foo_v1.py
 [2018-10-29 17:08:37,016] {__init__.py:57} INFO - Using executor SequentialExecutor
 creating DAG pgrandjean_pgrandjean_spark2.1.0_hadoop2.6.0

I then launch the webserver and the scheduler locally. My problem is that I don't see any DAG with id foo_v1. There is no pyc file being created. What is being done wrong? Why isn't the code in foo_v1.py being executed?

like image 557
pgrandjean Avatar asked Oct 29 '18 21:10


People also ask

How do I set dependencies between tasks in Airflow?

Basic dependencies between Airflow tasks can be set in the following ways: Using bitshift operators ( << and >> ) Using the set_upstream and set_downstream methods.

Is Start_date mandatory in Airflow DAG?

This is no longer required. Airflow will now auto align the start_date and the schedule , by using the start_date as the moment to start looking.

How many DAGs can Airflow run at once?

concurrency :** The maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to allow one DAG to run 32 tasks at once, and another DAG can be set to run 16 tasks at once.

What does BashOperator mean in Airflow DAG?

The Airflow BashOperator is used on the system to run a Bash script, command, or group of commands. You can import Airflow BashOperator using the following command: from airflow.operators.bash_operator import BashOperator.

3 Answers

To be found by Airflow, the DAG object returned by create_dag() must be in the global namespace of the foo_v1.py module. One way to place a DAG in the global namespace is simply to assign it to a module level variable:

from common.common import create_dag

dag = create_dag('foo', 'v1')

Another way is to update the global namespace using globals():

globals()['foo_v1'] = create_dag('foo', 'v1')

The later may look like an overkill, but it is useful for creating multiple DAGs dynamically. For example, in a for-loop:

for i in range(10):
    globals()[f'foo_v{i}'] = create_dag('foo', f'v{i}')

Note: Any *.py file placed in $AIRFLOW_HOME/dags (even in sub-directories, such as common in your case) will be parsed by Airflow. If you do not want this you can use .airflowignore or packaged DAGs.

like image 157
SergiyKolesnikov Avatar answered Sep 21 '22 15:09


You need to assign the dag to an exported variable in the module. If the dag isn't in the module __dict__ airflow's DagBag processor won't pick it up.

Check out the source here: https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L428

like image 30
nimish Avatar answered Sep 21 '22 15:09


As it is mentioned in here, you must return the dag after creating it!

default_args = ...

def create_dag(project, version):
  dag_id = project + '_' + version
  dag = DAG(dag_id, default_args=default_args, schedule_interval='*/10 * * * *', catchup=False)
  print('creating DAG ' + dag_id)

  t1 = BashOperator(

  t2 = BashOperator(
    bash_command='sleep 5',


  return dag # Add this line to your code!
like image 40
Mostafa Ghadimi Avatar answered Sep 21 '22 15:09

Mostafa Ghadimi