Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use connection hooks with `KubernetesPodOperator` as environment variables on Apache Airflow on GCP Cloud Composer

I'd like to use connections saved in airflow in a task which uses the KubernetesPodOperator.

When developing the image I've used environment variables to pass database connection information down to the container, but the production environment has the databases saved as connection hooks.

What is the best way to extract the database connection information and pass it down to the container?

env_vars = {'database_usr': 'xxx', 'database_pas': 'xxx'}
KubernetesPodOperator(
        dag=dag,
        task_id="example-task",
        name="example-task",
        namespace="default",
        image="eu.gcr.io/repo/image:tag",
        image_pull_policy="Always",
        arguments=["-v", "image-command", "image-arg"],
        env_vars=env_vars,
    )
like image 903
wab Avatar asked Mar 16 '20 17:03

wab


People also ask

How does Kubernetes performance work in Cloud Composer?

When you create a Cloud Composer environment, you specify its performance parameters, including performance parameters for environment's cluster. Launching Kubernetes pods into the environment cluster can cause competition for cluster resources, such as CPU or memory.

What is the difference between kubernetespodoperator and Google Kubernetes Engine operators?

KubernetesPodOperator launches Kubernetes pods in your environment's cluster. In comparison, Google Kubernetes Engine operators run Kubernetes pods in a specified cluster, which can be a separate cluster that is not related to your environment. You can also create and delete clusters using Google Kubernetes Engine operators.

How do I set up Kubernetes secrets in airflow?

The first secret, airflow-secrets, is set to a Kubernetes environment variable named SQL_CONN (as opposed to an Airflow or Cloud Composer environment variable). The second secret, service-account, mounts service-account.json, a file with a service account token, to /var/secrets/google.

Why can't I launch Kubernetes pods into the Environment cluster?

Launching Kubernetes pods into the environment cluster can cause competition for cluster resources, such as CPU or memory. Because the Airflow scheduler and workers are in the same GKE cluster, the schedulers and workers won't work properly if the competition results in resource starvation.


1 Answers

My current solution is to grab the variables from the connection using BaseHook:

from airflow.hooks.base_hook import BaseHook


def connection_to_dict(connection_id):
    """Returns connection params from Airflow as a dictionary.

    Parameters
    ----------
    connection_id : str
        Name of the connection in Airflow, e.g. `mysql_default`

    Returns
    -------
    dict
        Unencrypted values.
    """
    conn_obj = BaseHook.get_connection(connection_id)
    d = conn_obj.__dict__
    if ('is_encrypted', True) in d.items():
        d['password'] = conn_obj.get_password()
    return d

and then passing those as environment variables to the Kubernetes pod operator.

like image 101
wab Avatar answered Oct 12 '22 22:10

wab