Note: This is NOT a duplicate of
I have to trigger certain tasks at remote systems from my Airflow
DAG
. The straight-forward way to achieve this is SSHHook
.
The problem is that the remote system is an EMR
cluster which is itself created at runtime (by an upstream task) using EmrCreateJobFlowOperator
. So while I can get hold of job_flow_id
of the launched EMR cluster (using XCOM
), what I need is to an ssh_conn_id
to be passed to each downstream task.
Looking at the docs and code, it is evident that Airflow will try to look up for this connection (using conn_id
) in db and environment variables, so now the problem boils down to being able to set either of these two properties at runtime (from within an operator
).
This seems a rather common problem because if this isn't achievable then the utility of EmrCreateJobFlowOperator
would be severely hampered; but I haven't come across any example demonstrating it.
I'm on
Airflow v1.10
Python 3.6.6
emr-5.15
(can upgrade if required)Yes, you can create connections at runtime, even at DAG creation time if you're careful enough. Airflow is completely transparent on its internal models, so you can interact with the underlying SqlAlchemy directly. As exemplified originally in this answer, it's as easy as:
from airflow.models import Connection
from airflow import settings
def create_conn(username, password, host=None):
new_conn = Connection(conn_id=f'{username}_connection',
login=username,
host=host if host else None)
new_conn.set_password(password)
session = settings.Session()
session.add(new_conn)
session.commit()
Where you can, of course, interact with any other extra Connection properties you may require for the EMR connection.
This is not a limitation of Airflow or Python, but (AFAIK for every major OS) environments are bound to the lifetime of a process. When you export
a variable in bash for example, you're simply stating that when you spawn child processes, you want to copy that variable to the child's environment. This means that the parent process can't change the child's environment after its creation and the child can't change the parents environment.
In short, only the process itself can change its environment after it's created. And considering that worker process are Airflow subprocesses, it's hard to control the creation of their environments as well. What you can do is to write the environment variables into a file and intentionally update the current environment with overrides from that file on each task start.
The way you can do this is to create an Airflow task after EmrCreateJobFlowOperator
, that uses BashOperator to probably use aws-cli to retrieve the IP Address of the Virtual Machine where you want to run the task and in the same task run airflow cli that creates an SSH connection using that IP address.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With