What's the best way to get a SQLAlchemy engine from an Airflow connection ID?
Currently I am creating a hook, retrieving its URI, then using it to create a SQLAlchemy engine.
postgres_hook = PostgresHook(self.postgres_conn_id)
engine = create_engine(postgres_hook.get_uri())
This works but both commands make a connection to the database.
When I have "extra" parameters on the connection a third connection is needed to retrieve those (see Retrieve full connection URI from Airflow Postgres hook)
Is there a shorter and more direct method?
The create_engine() method of sqlalchemy library takes in the connection URL and returns a sqlalchemy engine that references both a Dialect and a Pool, which together interpret the DBAPI's module functions as well as the behavior of the database.
Every pool implementation in SQLAlchemy is thread safe, including the default QueuePool . This means that 2 threads requesting a connection simultaneously will checkout 2 different connections. By extension, an engine will also be thread-safe.
To be clear, indeed your commands will make two database connections, but it's to two separate databases (unless you're trying to connect to your Postgres Airflow database). The first line of initializing the hook should not make any connections. Only the second line first grabs the connection details from the Airflow database (which I don't think you can avoid), then uses that to connect to the Postgres database (which I think is the point).
You can make slightly simpler though with:
postgres_hook = PostgresHook(self.postgres_conn_id)
engine = postgres_hook.get_sqlalchemy_engine()
That seems pretty clean, but if you want to get even more direct without going through PostgresHook
, you could fetch it directly by querying Airflow's database. However, that means you're going to end up duplicating the code to build a URI from the connection object. The underlying implementation of get_connection() is a good example if you want to proceed with this.
from airflow.settings import Session
conn = session.query(Connection).filter(Connection.conn_id == self.postgres_conn_id).one()
... # build uri from connection
create_engine(uri)
Additionally, if you want to be able to access the extras
without a separate database fetch beyond what get_uri()
or get_sqlalchemy_engine()
does, is you can override BaseHook.get_connection() to save the connection object to an instance variable for reuse. This would require creating your own hook on top of PostgresHook
, so I understand that may not be ideal.
class CustomPostgresHook(PostgresHook):
@classmethod
def get_connection(cls, conn_id): # type: (str) -> Connection
conn = super().get_connection(conn_id)
self.conn_obj = conn # can't use self.conn because PostgresHook will overriden in https://github.com/apache/airflow/blob/1.10.10/airflow/hooks/postgres_hook.py#L93 by a different type of connection
return conn
postgres_hook = CustomPostgresHook(self.postgres_conn_id)
uri = postgres_hook.get_uri()
# do something with postgres_hook.conn_obj.extras_dejson
Some built in Airflow hooks have this behavior already (grpc, samba, tableau), but it's definitely not standardized.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With