I am working in $AIRFLOW_HOME/dags
. I have created the following files:
- common
|- __init__.py # empty
|- common.py # common code
- foo_v1.py # dag instanciation
In common.py
:
default_args = ...
def create_dag(project, version):
dag_id = project + '_' + version
dag = DAG(dag_id, default_args=default_args, schedule_interval='*/10 * * * *', catchup=False)
print('creating DAG ' + dag_id)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
t2.set_upstream(t1)
In foo_v1.py
:
from common.common import create_dag
create_dag('foo', 'v1')
When testing the script with python, it looks OK:
$ python foo_v1.py
[2018-10-29 17:08:37,016] {__init__.py:57} INFO - Using executor SequentialExecutor
creating DAG pgrandjean_pgrandjean_spark2.1.0_hadoop2.6.0
I then launch the webserver and the scheduler locally. My problem is that I don't see any DAG with id foo_v1
. There is no pyc
file being created. What is being done wrong? Why isn't the code in foo_v1.py
being executed?
Basic dependencies between Airflow tasks can be set in the following ways: Using bitshift operators ( << and >> ) Using the set_upstream and set_downstream methods.
This is no longer required. Airflow will now auto align the start_date and the schedule , by using the start_date as the moment to start looking.
concurrency :** The maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to allow one DAG to run 32 tasks at once, and another DAG can be set to run 16 tasks at once.
The Airflow BashOperator is used on the system to run a Bash script, command, or group of commands. You can import Airflow BashOperator using the following command: from airflow.operators.bash_operator import BashOperator.
To be found by Airflow, the DAG object returned by create_dag()
must be in the global namespace of the foo_v1.py
module. One way to place a DAG in the global namespace is simply to assign it to a module level variable:
from common.common import create_dag
dag = create_dag('foo', 'v1')
Another way is to update the global namespace using globals()
:
globals()['foo_v1'] = create_dag('foo', 'v1')
The later may look like an overkill, but it is useful for creating multiple DAGs dynamically. For example, in a for-loop:
for i in range(10):
globals()[f'foo_v{i}'] = create_dag('foo', f'v{i}')
Note: Any *.py
file placed in $AIRFLOW_HOME/dags
(even in sub-directories, such as common
in your case) will be parsed by Airflow. If you do not want this you can use .airflowignore
or packaged DAGs.
You need to assign the dag to an exported variable in the module. If the dag isn't in the module __dict__
airflow's DagBag processor won't pick it up.
Check out the source here: https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L428
As it is mentioned in here, you must return the dag after creating it!
default_args = ...
def create_dag(project, version):
dag_id = project + '_' + version
dag = DAG(dag_id, default_args=default_args, schedule_interval='*/10 * * * *', catchup=False)
print('creating DAG ' + dag_id)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
t2.set_upstream(t1)
return dag # Add this line to your code!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With