Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

use Airflow connection from a jinja template

Tags:

airflow

I'm trying to pass DB params to BashOperator using environment variables, but I can't find any documentation/examples how to use a connection from a Jinja template.

So I'm looking for something similar to variables

echo {{ var.value.<variable_name> }}
like image 983
Mikhail Tokarev Avatar asked Jan 21 '21 11:01

Mikhail Tokarev


People also ask

What is Jinja template in airflow?

Templating in Airflow works exactly the same as templating with Jinja in Python: define your to-be-evaluated code between double curly braces, and the expression will be evaluated at runtime. As we saw in the previous code snippet, execution_date is a variable available at runtime.

How does Jinja template work?

It is a text-based template language and thus can be used to generate any markup as well as source code. The Jinja template engine allows customization of tags, filters, tests, and globals. Also, unlike the Django template engine, Jinja allows the template designer to call functions with arguments on objects.


1 Answers

For Airflow >= 2.2.0:

Assuming you have conn id test_conn you can use macros directly via:

{{ conn.test_conn }} so you get any connection attribute like:

{{ conn.test_conn.host }}, {{ conn.test_conn.login }}, {{ conn.test_conn.password }} and so on.

For Airflow < 2.2.0:

There is no ready to use macro however you can create custom macros to address this.

Connection example:

enter image description here

Creating the macros:

def get_host(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.host

def get_schema(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.schema

def get_login(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.login

Using them in a DAG:

def print_function(**context):
    print(f"host={context['host']} schema={context['schema']} login={context['login']}")

user_macros = {
    'get_host': get_host,
    'get_schema': get_schema,
    'get_login': get_login,
}

with DAG(
    dag_id='connection',
    default_args=default_args,
    schedule_interval=None,
    user_defined_macros=user_macros,
) as dag:

# Example how to use as function
python_op = PythonOperator( 
    task_id='python_task',
    provide_context=True,
    python_callable=print_function,
    op_kwargs={
        'host': get_host("test_conn"),
        'schema': get_schema("test_conn"),
        'login': get_login("test_conn"),
    }
)

# Example how to use as Jinja string
bash_op = BashOperator( 
    task_id='bash_task',
    bash_command='echo {{ get_host("test_conn") }} {{ get_schema("test_conn") }} {{ get_login("test_conn") }} ',
)

Rendering for PythonOperator example: enter image description here

enter image description here

Rendering for BashOperator example:

enter image description here

General Explnation: What this code does is creating a custom function func() to be used as user_defined_macros thus providing the ability to use it just like this macro was defined by Airflow itself. You can access the templating as: {{ func() }} as seen in the example the function allow accept parameters.

Note you can create such functions for all fields in the connection object.

be cautious with how you use it, passing passwords as text may not be a good idea.

like image 120
Elad Kalif Avatar answered Sep 21 '22 13:09

Elad Kalif