I'm trying to pass DB params to BashOperator using environment variables, but I can't find any documentation/examples how to use a connection from a Jinja template.
So I'm looking for something similar to variables
echo {{ var.value.<variable_name> }}
Templating in Airflow works exactly the same as templating with Jinja in Python: define your to-be-evaluated code between double curly braces, and the expression will be evaluated at runtime. As we saw in the previous code snippet, execution_date is a variable available at runtime.
It is a text-based template language and thus can be used to generate any markup as well as source code. The Jinja template engine allows customization of tags, filters, tests, and globals. Also, unlike the Django template engine, Jinja allows the template designer to call functions with arguments on objects.
For Airflow >= 2.2.0:
Assuming you have conn id test_conn
you can use macros directly via:
{{ conn.test_conn }}
so you get any connection attribute like:
{{ conn.test_conn.host }}
, {{ conn.test_conn.login }}
, {{ conn.test_conn.password }}
and so on.
For Airflow < 2.2.0:
There is no ready to use macro however you can create custom macros to address this.
Connection example:
Creating the macros:
def get_host(conn_id):
connection = BaseHook.get_connection(conn_id)
return connection.host
def get_schema(conn_id):
connection = BaseHook.get_connection(conn_id)
return connection.schema
def get_login(conn_id):
connection = BaseHook.get_connection(conn_id)
return connection.login
Using them in a DAG:
def print_function(**context):
print(f"host={context['host']} schema={context['schema']} login={context['login']}")
user_macros = {
'get_host': get_host,
'get_schema': get_schema,
'get_login': get_login,
}
with DAG(
dag_id='connection',
default_args=default_args,
schedule_interval=None,
user_defined_macros=user_macros,
) as dag:
# Example how to use as function
python_op = PythonOperator(
task_id='python_task',
provide_context=True,
python_callable=print_function,
op_kwargs={
'host': get_host("test_conn"),
'schema': get_schema("test_conn"),
'login': get_login("test_conn"),
}
)
# Example how to use as Jinja string
bash_op = BashOperator(
task_id='bash_task',
bash_command='echo {{ get_host("test_conn") }} {{ get_schema("test_conn") }} {{ get_login("test_conn") }} ',
)
Rendering for PythonOperator
example:
Rendering for BashOperator
example:
General Explnation:
What this code does is creating a custom function func()
to be used as user_defined_macros
thus providing the ability to use it just like this macro was defined by Airflow itself.
You can access the templating as: {{ func() }}
as seen in the example the function allow accept parameters.
Note you can create such functions for all fields in the connection object.
be cautious with how you use it, passing passwords as text may not be a good idea.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With