Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I use aio-pika with FastAPI?

I want to create a REST service using FastAPI and aio-pika while working asynchronously. For other async database drivers, I could create clients on startup, when get them in route handlers. For example, with motor I would declare simple connection manager:

from motor.motor_asyncio import AsyncIOMotorClient


class Database:
    client: AsyncIOMotorClient = None


db = Database()


async def connect_to_mongo():
    db.client = AsyncIOMotorClient("mongo:27017")


async def close_mongo_connection():
    db.client.close()


async def get_mongo_client() -> AsyncIOMotorClient:
    return db.client

Then add couple of handlers:

app.add_event_handler("startup", connect_to_mongo)
app.add_event_handler("shutdown", close_mongo_connection)

and then just use get_mongo_client to get one to my handler.

Problem here is that aio-pika needs asyncio loop to function. Here is an example from the docs:

connection = await aio_pika.connect_robust(
        "amqp://guest:[email protected]/", loop=loop
    )

And with FastAPI I don't have asyncio loop. Are there any way to use it with interface like in example? Can I just create new loop using asyncio.get_event_loop() and pass it to the connect_robust without really using it anywhere? Like this:

connection = await aio_pika.connect_robust(
            "amqp://guest:[email protected]/", loop=asyncio.get_event_loop()
        )
like image 205
keddad Avatar asked Nov 07 '25 06:11

keddad


1 Answers

Ok, so, according to the docs, I can just use connect instead of connect_robust:

connection = await aio_pika.connect(
        "amqp://guest:[email protected]/"
    )
like image 182
keddad Avatar answered Nov 08 '25 22:11

keddad