Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to do persistent database connection in FastAPI?

I am writing my first project in FastAPI and I am struggling a bit. In particular, I am not sure how I am supposed to use asyncpg connection pool in my app. Currently what I have goes like this

in db.py I have

pgpool = None


async def get_pool():
    global pgpool
    if not pgpool:
        pgpool = await asyncpg.create_pool(dsn='MYDB_DSN')
    return pgpool

and then in individual files I use the get_pool as a dependency.

@router.post("/user/", response_model=models.User, status_code=201)
async def create_user(user: models.UserCreate, pgpool = Depends(get_pool)):
    # ... do things ...

First, every endpoint I have uses the database, so it seems silly to add that dependency argument for every single function. Second, this seems like a roundabout way of doing things. I define a global, then I define a function that returns that global and then I inject the function. I am sure there is more natural way of going about it.

I have seen people suggest just adding whatever I need as a property to the app object

@app.on_event("startup")
async def startup():
    app.pool = await asyncpg.create_pool(dsn='MYDB_DSN')

but it doesn't work when I have multiple files with routers, I don't know how to access the app object from a router object.

What am I missing?

like image 206
Mad Wombat Avatar asked Aug 05 '20 17:08

Mad Wombat


People also ask

Does FastAPI have ORM?

ORMs. FastAPI works with any database and any style of library to talk to the database. A common pattern is to use an "ORM": an "object-relational mapping" library.

What ORM to use with FastAPI?

FastAPI has an excellent integration with SQLAlchemy for working with relational databases. SQLAlchemy enables us to use the Object Relational Mapper (ORM) pattern and create Python models and objects that will represent our database tables and entities.

What is fastapi?

What is FastAPI? FastAPI is a high-performance API based on Pydantic and Starlette. FastAPI integrates well with many packages, including many ORMs. With FastAPI, you can use most relational databases. FastAPI easily integrates with SQLAlchemy and SQLAlchemy supports PostgreSQL, MySQL, SQLite, Oracle, Microsoft SQL Server and others.

How do I create a session in fastapi?

The FastAPI docs include a get_db () function that allows a route to use the same session through a request and then close it when the request is finished. Then get_db () creates a new session for the next request. Once we have our database connection and session set up, we are ready to build our other app components.

Does fastapi work with SQLAlchemy?

Notice that most of the code is the standard SQLAlchemy code you would use with any framework. The FastAPI specific code is as small as always. FastAPI works with any database and any style of library to talk to the database. A common pattern is to use an "ORM": an "object-relational mapping" library.

How big is the fastapi code?

The FastAPI specific code is as small as always. FastAPI works with any database and any style of library to talk to the database. A common pattern is to use an "ORM": an "object-relational mapping" library. An ORM has tools to convert (" map ") between objects in code and database tables (" relations ").


1 Answers

You can use an application factory pattern to setup your application.

To avoid using global or adding things directly to the app object you can create your own class Database to hold your connection pool.

To pass the connection pool to every route you can use a middleware and add the pool to request.state

Here's the example code:

import asyncio

import asyncpg
from fastapi import FastAPI, Request

class Database():

    async def create_pool(self):
        self.pool = await asyncpg.create_pool(dsn='MYDB_DSN')

def create_app():

    app = FastAPI()
    db = Database()

    @app.middleware("http")
    async def db_session_middleware(request: Request, call_next):
        request.state.pgpool = db.pool
        response = await call_next(request)
        return response

    @app.on_event("startup")
    async def startup():
        await db.create_pool()

    @app.on_event("shutdown")
    async def shutdown():
        # cleanup
        pass

    @app.get("/")
    async def hello(request: Request):
        print(request.state.pool)

    return app

app = create_app()
like image 86
Gabriel Cappelli Avatar answered Sep 27 '22 01:09

Gabriel Cappelli