I have the following problem: given a backend running fastapi, that has a streaming endpoint, which is used to update the frontend, I want to send these updates every time the function that updates the backend state gets called (which can be either by an scheduled job or a different endpoint that was hit and caused the state to be updated).
A simplistic version of what I would like to implement would be:
from fastapi import FastAPI
from starlette.responses import StreamingResponse
class State:
def __init__(self):
self.messages = []
def update(self, new_messages):
self.messages = new_messages
# HERE: notify waiting stream endpoint
app = FastAPI()
state = State()
@app.get('/stream')
def stream():
def event_stream():
while True:
# HERE lies the question: wait for state to be update
for message in state.messages:
yield 'data: {}\n\n'.format(json.dumps(message))
return StreamingResponse(event_stream(), media_type="text/event-stream")
And I would like this to keep running forever. Every time the state gets updated, the event_stream
unblocks and send the messages.
I have looked a threading and asyncio, but I have a feeling that I am missing some simple concept of how to do this in python.
FastAPI is based on Starlette and Server-Sent Events plugin is available for Starlette
import asyncio
import uvicorn
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
MESSAGE_STREAM_DELAY = 1 # second
MESSAGE_STREAM_RETRY_TIMEOUT = 15000 # milisecond
app = FastAPI()
@app.get('/stream')
async def message_stream(request: Request):
def new_messages(): ...
async def event_generator():
while True:
# If client was closed the connection
if await request.is_disconnected():
break
# Checks for new messages and return them to client if any
if new_messages():
yield {
"event": "new_message",
"id": "message_id",
"retry": MESSAGE_STREAM_RETRY_TIMEOUT,
"data": "message_content"
}
await asyncio.sleep(MESSAGE_STREAM_DELAY)
return EventSourceResponse(event_generator())
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)
The simplest way I could find to solve this was to use threading.Condition
.
Therefore it became:
import threading
from fastapi import FastAPI
from starlette.responses import StreamingResponse
condition = threading.Condition()
class State:
def __init__(self):
self.messages = []
def update(self, new_messages):
self.messages = new_messages
with condition:
condition.notify()
app = FastAPI()
state = State()
@app.get('/stream')
def stream():
def event_stream():
while True:
with condition:
condition.wait()
for message in state.messages:
yield 'data: {}\n\n'.format(json.dumps(message))
return StreamingResponse(event_stream(), media_type="text/event-stream")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With