Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SQLAlchemy engine from Airflow database hook

What's the best way to get a SQLAlchemy engine from an Airflow connection ID?

Currently I am creating a hook, retrieving its URI, then using it to create a SQLAlchemy engine.

postgres_hook = PostgresHook(self.postgres_conn_id)
engine = create_engine(postgres_hook.get_uri())

This works but both commands make a connection to the database.

When I have "extra" parameters on the connection a third connection is needed to retrieve those (see Retrieve full connection URI from Airflow Postgres hook)

Is there a shorter and more direct method?

like image 562
Ollie Glass Avatar asked Apr 30 '20 17:04

Ollie Glass


People also ask

What creates SQLAlchemy engine?

The create_engine() method of sqlalchemy library takes in the connection URL and returns a sqlalchemy engine that references both a Dialect and a Pool, which together interpret the DBAPI's module functions as well as the behavior of the database.

Is SQLAlchemy engine thread safe?

Every pool implementation in SQLAlchemy is thread safe, including the default QueuePool . This means that 2 threads requesting a connection simultaneously will checkout 2 different connections. By extension, an engine will also be thread-safe.


1 Answers

To be clear, indeed your commands will make two database connections, but it's to two separate databases (unless you're trying to connect to your Postgres Airflow database). The first line of initializing the hook should not make any connections. Only the second line first grabs the connection details from the Airflow database (which I don't think you can avoid), then uses that to connect to the Postgres database (which I think is the point).

You can make slightly simpler though with:

postgres_hook = PostgresHook(self.postgres_conn_id)
engine = postgres_hook.get_sqlalchemy_engine()

That seems pretty clean, but if you want to get even more direct without going through PostgresHook, you could fetch it directly by querying Airflow's database. However, that means you're going to end up duplicating the code to build a URI from the connection object. The underlying implementation of get_connection() is a good example if you want to proceed with this.

from airflow.settings import Session

conn = session.query(Connection).filter(Connection.conn_id == self.postgres_conn_id).one()
... # build uri from connection
create_engine(uri)

Additionally, if you want to be able to access the extras without a separate database fetch beyond what get_uri() or get_sqlalchemy_engine() does, is you can override BaseHook.get_connection() to save the connection object to an instance variable for reuse. This would require creating your own hook on top of PostgresHook, so I understand that may not be ideal.

class CustomPostgresHook(PostgresHook):

    @classmethod
    def get_connection(cls, conn_id):  # type: (str) -> Connection
        conn = super().get_connection(conn_id)
        self.conn_obj = conn  # can't use self.conn because PostgresHook will overriden in https://github.com/apache/airflow/blob/1.10.10/airflow/hooks/postgres_hook.py#L93 by a different type of connection
        return conn

postgres_hook = CustomPostgresHook(self.postgres_conn_id)
uri = postgres_hook.get_uri()
# do something with postgres_hook.conn_obj.extras_dejson

Some built in Airflow hooks have this behavior already (grpc, samba, tableau), but it's definitely not standardized.

like image 69
Daniel Huang Avatar answered Oct 18 '22 09:10

Daniel Huang