The Airflow docs say: You can use Jinja templating with every parameter that is marked as “templated” in the documentation
. It makes sense that specific parameters in the Airflow world (such as certain parameters to PythonOperator
) get templated by Airflow automatically. I'm wondering what the best/correct way is to get a non-Airflow variable to get templated. My specific use case is something similar to:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from somewhere import export_votes_data, export_queries_data
from elsewhere import ApiCaucus, ApiQueries
dag = DAG('export_training_data',
description='Export training data for all active orgs to GCS',
schedule_interval=None,
start_date=datetime(2018, 3, 26), catchup=False)
HOST = "http://api-00a.dev0.solvvy.co"
BUCKET = "gcs://my-bucket-name/{{ ds }}/" # I'd like this to get templated
votes_api = ApiCaucus.get_votes_api(HOST)
queries_api = ApiQueries.get_queries_api(HOST)
export_votes = PythonOperator(task_id="export_votes", python_callable=export_votes_data,
op_args=[BUCKET, votes_api], dag=dag)
export_queries = PythonOperator(task_id="export_queries", python_callable=export_query_data,
op_args=[BUCKET, queries_api, export_solutions.task_id], dag=dag,
provide_context=True)
Templating in Airflow works the same as Jinja templating in Python. You enclose the code you want evaluated between double curly braces, and the expression is evaluated at runtime. For a complete list of the available variables, see the Airflow Templates reference.
It is a text-based template language and thus can be used to generate any markup as well as source code. The Jinja template engine allows customization of tags, filters, tests, and globals. Also, unlike the Django template engine, Jinja allows the template designer to call functions with arguments on objects.
Variables are a generic way to store and retrieve arbitrary content or settings as a simple key value store within Airflow. Variables can be listed, created, updated and deleted from the UI ( Admin -> Variables ), code or CLI.
The provide_context
argument for the PythonOperator
will pass along the arguments that are used for templating. From the documentation:
provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define **kwargs in your function header.
With the context provided to your callable, you can then do the interpolation in your function:
def your_callable(bucket, api, **kwargs):
bucket = bucket.format(**kwargs)
[...]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With