In my project I try to start a REST API (built with FastAPI and run with Hypercorn), additional I want on startup also to start a RabbitMQ Consumer (with aio_pika):
Aio Pika offers a robust connection which automatically reconnects on failure. If I run the code below with hypercorn app:app
the consumer and the rest interface starts correctly, but the reconnect from aio_pika does not work anymore. How can I archive a production stable RabbitMQ Consumer and RestAPI in two different processes (or threads?). My python version is 3.7, please note I am actually a Java and Go developer in case my approach is not the Python way :-)
@app.on_event("startup")
def startup():
loop = asyncio.new_event_loop()
asyncio.ensure_future(main(loop))
@app.get("/")
def read_root():
return {"Hello": "World"}
async def main(loop):
connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]/", loop=loop
)
async with connection:
queue_name = "test_queue"
# Creating channel
channel = await connection.channel() # type: aio_pika.Channel
# Declaring queue
queue = await channel.declare_queue(
queue_name,
auto_delete=True
) # type: aio_pika.Queue
async with queue.iterator() as queue_iter:
# Cancel consuming after __aexit__
async for message in queue_iter:
async with message.process():
print(message.body)
if queue.name in message.body.decode():
break
With the help of @pgjones I managed changed the consuming start to:
@app.on_event("startup")
def startup():
loop = asyncio.get_event_loop()
asyncio.ensure_future(main(loop))
And start the job
with asyncio.ensure_future
and pass the current event loop as an argument, which solved the issue.
Would be interesting if somebody has a different/better approach Thanks!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With