Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

FastAPI middleware peeking into responses

I try to write a simple middleware for FastAPI peeking into response bodies.

In this example I just log the body content:

app = FastAPI()

@app.middleware("http")
async def log_request(request, call_next):
    logger.info(f'{request.method} {request.url}')
    response = await call_next(request)
    logger.info(f'Status code: {response.status_code}')
    async for line in response.body_iterator:
        logger.info(f'    {line}')
    return response

However it looks like I "consume" the body this way, resulting in this exception:

  ...
  File ".../python3.7/site-packages/starlette/middleware/base.py", line 26, in __call__
    await response(scope, receive, send)
  File ".../python3.7/site-packages/starlette/responses.py", line 201, in __call__
    await send({"type": "http.response.body", "body": b"", "more_body": False})
  File ".../python3.7/site-packages/starlette/middleware/errors.py", line 156, in _send
    await send(message)
  File ".../python3.7/site-packages/uvicorn/protocols/http/httptools_impl.py", line 515, in send
    raise RuntimeError("Response content shorter than Content-Length")
RuntimeError: Response content shorter than Content-Length

Trying to look into the response object I couldn't see any other way to read its content. What is the correct way to do it?

like image 429
FireAphis Avatar asked Mar 20 '20 16:03

FireAphis


3 Answers

This can be done easily with BackgroundTasks (https://fastapi.tiangolo.com/tutorial/background-tasks/)

Non blocking, the code executes after the response is sent to client, pretty easy to add.

Just take the request object and pass it to the Background Task. Also, before returning the response dict (or whatever data), pass it to the Background Task. The downside is that a part of the response is lost, only the data returned will be passed to the BT.

Also, another downside: those Background Tasks have to be added to each endpoint.

E.g.

from fastapi import BackgroundTasks, FastAPI
from starlette.requests import Request

app = FastAPI()

async def log_request(request, response):
    logger.info(f'{request.method} {request.url}')
    logger.info(f'{response['message']}')


@app.post("/dummy-endpoint/")
async def send_notification(request: Request, background_tasks: BackgroundTasks):
    my_response = {"message": "Notification sent in the background"}
    background_tasks.add_task(log_request, request=request, response=my_response)
    return my_response
like image 123
asdf Avatar answered Oct 18 '22 17:10

asdf


I know this is a relatively old post now, but I recently ran into this problem and came up with a solution:

Middleware Code

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
import json
from .async_iterator_wrapper import async_iterator_wrapper as aiwrap

class some_middleware(BaseHTTPMiddleware):
   async def dispatch(self, request:Request, call_next:RequestResponseEndpoint):
      # --------------------------
      # DO WHATEVER YOU TO DO HERE
      #---------------------------
      
      response = await call_next(request)

      # Consuming FastAPI response and grabbing body here
      resp_body = [section async for section in response.__dict__['body_iterator']]
      # Repairing FastAPI response
      response.__setattr__('body_iterator', aiwrap(resp_body)

      # Formatting response body for logging
      try:
         resp_body = json.loads(resp_body[0].decode())
      except:
         resp_body = str(resp_body)

async_iterator_wrapper Code from TypeError from Python 3 async for loop

class async_iterator_wrapper:
    def __init__(self, obj):
        self._it = iter(obj)
    def __aiter__(self):
        return self
    async def __anext__(self):
        try:
            value = next(self._it)
        except StopIteration:
            raise StopAsyncIteration
        return value

I really hope this can help someone! I found this very helpful for logging.

Big thanks to @Eddified for the aiwrap class

like image 15
SteveTheProgrammer Avatar answered Oct 18 '22 19:10

SteveTheProgrammer


I had a similar need in a FastAPI middleware and although not ideal here's what we ended up with:

app = FastAPI()

@app.middleware("http")
async def log_request(request, call_next):
    logger.info(f'{request.method} {request.url}')
    response = await call_next(request)
    logger.info(f'Status code: {response.status_code}')
    body = b""
    async for chunk in response.body_iterator:
        body += chunk
    # do something with body ...
    return Response(
        content=body,
        status_code=response.status_code,
        headers=dict(response.headers),
        media_type=response.media_type
    )

Be warned that such an implementation is problematic with responses streaming a body that would not fit in your server RAM (imagine a response of 100GB).

Depending on what your application does, you will rule if it is an issue or not.


In the case where some of your endpoints produce large responses, you might want to avoid using a middleware and instead implement a custom ApiRoute. This custom ApiRoute would have the same issue with consuming the body, but you can limit it's usage to a particular endpoints.

Learn more at https://fastapi.tiangolo.com/advanced/custom-request-and-route/

like image 13
Thomasleveil Avatar answered Oct 18 '22 18:10

Thomasleveil