My questions :
In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.
The precedence rules for a task are as follows: Explicitly passed arguments. Values that exist in the default_args dictionary. The operator's default value, if one exists.
Airflow looks in your DAGS_FOLDER for modules that contain DAG objects in their global namespace and adds the objects it finds in the DagBag . Knowing this, all we need is a way to dynamically assign variable in the global namespace.
The unique identifier or DAG ID When you instantiate a DAG object you have to specify a DAG ID. The DAG ID must be unique across all of your DAGs. You should never have two DAGs with the same DAG ID otherwise only one DAG will show up and you might get unexpected behaviours.
I use something like this.
Example tree:
├───dags │ ├───common │ │ ├───hooks │ │ │ pysftp_hook.py │ │ │ │ │ ├───operators │ │ │ docker_sftp.py │ │ │ postgres_templated_operator.py │ │ │ │ │ └───scripts │ │ delete.py │ │ │ ├───project_1 │ │ │ dag_1.py │ │ │ dag_2.py │ │ │ │ │ └───sql │ │ dim.sql │ │ fact.sql │ │ select.sql │ │ update.sql │ │ view.sql │ │ │ └───project_2 │ │ dag_1.py │ │ dag_2.py │ │ │ └───sql │ dim.sql │ fact.sql │ select.sql │ update.sql │ view.sql │ └───data ├───project_1 │ ├───modified │ │ file_20180101.csv │ │ file_20180102.csv │ │ │ └───raw │ file_20180101.csv │ file_20180102.csv │ └───project_2 ├───modified │ file_20180101.csv │ file_20180102.csv │ └───raw file_20180101.csv file_20180102.csv
Update October 2021. I have a single repository for all projects now. All of my transformation scripts are in the plugins folder (which also contains hooks and operators - basically any code which I import into my DAGs). DAG code I try to keep pretty bare so it basically just dictates the schedules and where data is loaded to and from.
├───dags │ │ │ ├───project_1 │ │ dag_1.py │ │ dag_2.py │ │ │ └───project_2 │ dag_1.py │ dag_2.py │ ├───plugins │ ├───hooks │ │ pysftp_hook.py | | servicenow_hook.py │ │ │ ├───sensors │ │ ftp_sensor.py | | sql_sensor.py | | │ ├───operators │ │ servicenow_to_azure_blob_operator.py │ │ postgres_templated_operator.py │ | │ ├───scripts │ ├───project_1 | | transform_cases.py | | common.py │ ├───project_2 | | transform_surveys.py | | common.py │ ├───common | helper.py | dataset_writer.py | .airflowignore | Dockerfile | docker-stack-airflow.yml
I would love to benchmark folder structure with other people as well. Maybe it will depend on what you are using Airflow to but I will share my case. I am doing data pipelines to build a data warehouse so in high level I basically have two steps:
Today I organize the files into three main folders that try to reflect the logic above:
├── dags │ ├── dag_1.py │ └── dag_2.py ├── data-lake │ ├── data-source-1 │ └── data-source-2 └── dw ├── cubes │ ├── cube_1.sql │ └── cube_2.sql ├── dims │ ├── dim_1.sql │ └── dim_2.sql └── facts ├── fact_1.sql └── fact_2.sql
This is more or less my basic folder structure.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With