Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to send server-side events from python (fastapi) upon calls to a function that updates the backend state

I have the following problem: given a backend running fastapi, that has a streaming endpoint, which is used to update the frontend, I want to send these updates every time the function that updates the backend state gets called (which can be either by an scheduled job or a different endpoint that was hit and caused the state to be updated).

A simplistic version of what I would like to implement would be:

from fastapi import FastAPI
from starlette.responses import StreamingResponse

class State:
    def __init__(self):
        self.messages = []

    def update(self, new_messages):
        self.messages = new_messages
        # HERE: notify waiting stream endpoint

app = FastAPI()

state = State()

@app.get('/stream')
def stream():
    def event_stream():
        while True:
            # HERE lies the question: wait for state to be update
            for message in state.messages:
                yield 'data: {}\n\n'.format(json.dumps(message))
    return StreamingResponse(event_stream(), media_type="text/event-stream")

And I would like this to keep running forever. Every time the state gets updated, the event_stream unblocks and send the messages.

I have looked a threading and asyncio, but I have a feeling that I am missing some simple concept of how to do this in python.

like image 517
Vini Avatar asked Nov 16 '19 21:11

Vini


2 Answers

FastAPI is based on Starlette and Server-Sent Events plugin is available for Starlette

import asyncio
import uvicorn
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse

MESSAGE_STREAM_DELAY = 1  # second
MESSAGE_STREAM_RETRY_TIMEOUT = 15000  # milisecond
app = FastAPI()


@app.get('/stream')
async def message_stream(request: Request):
    def new_messages(): ...
    async def event_generator():
        while True:
            # If client was closed the connection
            if await request.is_disconnected():
                break

            # Checks for new messages and return them to client if any
            if new_messages():
                yield {
                        "event": "new_message",
                        "id": "message_id",
                        "retry": MESSAGE_STREAM_RETRY_TIMEOUT,
                        "data": "message_content"
                }

            await asyncio.sleep(MESSAGE_STREAM_DELAY)

    return EventSourceResponse(event_generator())


if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8000)

like image 115
Saber Hayati Avatar answered Nov 14 '22 21:11

Saber Hayati


The simplest way I could find to solve this was to use threading.Condition.

Therefore it became:

import threading

from fastapi import FastAPI
from starlette.responses import StreamingResponse

condition = threading.Condition()

class State:
    def __init__(self):
        self.messages = []

    def update(self, new_messages):
        self.messages = new_messages
        with condition:
            condition.notify()

app = FastAPI()

state = State()

@app.get('/stream')
def stream():
    def event_stream():
        while True:
            with condition:
                condition.wait()

            for message in state.messages:
                yield 'data: {}\n\n'.format(json.dumps(message))
    return StreamingResponse(event_stream(), media_type="text/event-stream")




like image 36
Vini Avatar answered Nov 14 '22 23:11

Vini