Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create and use Connections in Airflow operator at runtime [duplicate]

Tags:

airflow

Note: This is NOT a duplicate of

  • Export environment variables at runtime with airflow
  • Set Airflow Env Vars at Runtime

I have to trigger certain tasks at remote systems from my Airflow DAG. The straight-forward way to achieve this is SSHHook.

The problem is that the remote system is an EMR cluster which is itself created at runtime (by an upstream task) using EmrCreateJobFlowOperator. So while I can get hold of job_flow_id of the launched EMR cluster (using XCOM), what I need is to an ssh_conn_id to be passed to each downstream task.


Looking at the docs and code, it is evident that Airflow will try to look up for this connection (using conn_id) in db and environment variables, so now the problem boils down to being able to set either of these two properties at runtime (from within an operator).

This seems a rather common problem because if this isn't achievable then the utility of EmrCreateJobFlowOperator would be severely hampered; but I haven't come across any example demonstrating it.


  • Is it possible to create (and also destroy) either of these from within an Airflow operator?
    1. Connection (persisted in Airflow's db)
    2. Environment Variable (should be accessible to all downstream tasks and not just current task as told here)
  • If not, what are my options?

I'm on

  • Airflow v1.10
  • Python 3.6.6
  • emr-5.15 (can upgrade if required)
like image 231
y2k-shubham Avatar asked Dec 14 '22 13:12

y2k-shubham


2 Answers

Connections come from the ORM

Yes, you can create connections at runtime, even at DAG creation time if you're careful enough. Airflow is completely transparent on its internal models, so you can interact with the underlying SqlAlchemy directly. As exemplified originally in this answer, it's as easy as:

from airflow.models import Connection
from airflow import settings

def create_conn(username, password, host=None):
    new_conn = Connection(conn_id=f'{username}_connection',
                                  login=username,
                                  host=host if host else None)
    new_conn.set_password(password)

    session = settings.Session()
    session.add(new_conn)
    session.commit()

Where you can, of course, interact with any other extra Connection properties you may require for the EMR connection.

Environment are process-bounded

This is not a limitation of Airflow or Python, but (AFAIK for every major OS) environments are bound to the lifetime of a process. When you export a variable in bash for example, you're simply stating that when you spawn child processes, you want to copy that variable to the child's environment. This means that the parent process can't change the child's environment after its creation and the child can't change the parents environment.

In short, only the process itself can change its environment after it's created. And considering that worker process are Airflow subprocesses, it's hard to control the creation of their environments as well. What you can do is to write the environment variables into a file and intentionally update the current environment with overrides from that file on each task start.

like image 82
villasv Avatar answered Dec 16 '22 03:12

villasv


The way you can do this is to create an Airflow task after EmrCreateJobFlowOperator, that uses BashOperator to probably use aws-cli to retrieve the IP Address of the Virtual Machine where you want to run the task and in the same task run airflow cli that creates an SSH connection using that IP address.

like image 31
kaxil Avatar answered Dec 16 '22 02:12

kaxil