I'm using the BigQueryOperator
extenstively in my Airflow DAGs on Google Cloud Composer.
For longer queries, it's better to put each query in its own .sql
file rather than cluttering up the DAG with it. Airflow seems to support this for all SQL Query operators, including the BigQueryOperator, as you can see in the documentation.
My question: after I've written a my sql statement in a .sql
template file, how do I add it to Google Cloud Composer and reference it in a DAG?
template_searchpath (str or list[str]) – This list of folders (non relative) defines where jinja will look for your templates. Order matters. Note that jinja/airflow includes the path of your DAG file by default. template_undefined (jinja2.
Each Cloud Composer environment has a web server that runs the Airflow web interface. You can manage DAGs from the Airflow web interface. In the Google Cloud console, go to the Environments page. To open the Airflow web interface, click the Airflow link for example-environment .
To add or update a DAG, move the Python . py file for the DAG to the /dags folder in the environment's bucket. In Google Cloud console, go to the Environments page. In the list of environments, find a row with the name of your environment and in the DAGs folder column click the DAGs link.
I found an ideal fix for this question. In your dag declaration you can set template_searchpath
which is the default path where Airflow will lookup jinja templated files.
In order to make this work in your Cloud Composer instance, you must set it at follows
dag = DAG(
...
template_searchpath=["/home/airflow/gcs/plugins"],
)
Note that I used the plugins folder for this example. You can use your data folder instead or any folder you want to have inside your bucket.
After googling around and finding this related question. I've found a way to make this work (although it's not the ideal solution, as we'll see). Here is a working example with three pieces:
gcloud
command needed to upload the template to the right place.(1) The sql template file
This is just a text file whose filename ends with the .sql
extension. Let's say this file is called my-templated-query.sql
and contains:
SELECT COUNT(1)
FROM mytable
WHERE _PARTITIONTIME = TIMESTAMP('{{ ds }}')
(2) Referencing the template in the DAG file To reference this template, create an operator like the following:
count_task = BigQueryOperator(
task_id='count_rows',
sql='/my-templated-query.sql')
(3) Adding the template file to Google Cloud Composer It turns out that by default, airflow looks for template files in the dags folder. To upload our templated file to the dags folder, we run
gcloud beta composer environments storage dags import --environment my-env-name --location us-central1 --source path/to/my-templated-query.sql
You'll have to replace the env name, location, and source path accordingly.
It doesn't really seem right to upload all these templates to the dag folder. A better Airflow practice is to put your templates in their own folder, and specify the template_searchpath
parameter to point to it when you create your DAG. However, I'm not sure how to do this with Google Cloud Composer.
Update: I've realized it's possible to put subfolders in the DAG folder, which is useful for organizing large numbers of SQL templates. Let's say I put a SQL template file in DAG_FOLDER/dataset1/table1.sql
In the BigQueryOperator, Ithen can then refer to this using sql=/dataset1/table1.sql
. If you've a subfolder with lots of files in it and lots of other subfolders in it, you can also use the dag import
I show above to upload the entire sub folder recursively--just point it to subfolder.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With