Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to send a progress of operation in a FastAPI app?

I have deployed a fastapi endpoint,

from fastapi import FastAPI, UploadFile
from typing import List

app = FastAPI()

@app.post('/work/test')
async def testing(files: List(UploadFile)):
    for i in files:
        .......
        # do a lot of operations on each file

        # after than I am just writing that processed data into mysql database
        # cur.execute(...)
        # cur.commit()
        .......
    
    # just returning "OK" to confirm data is written into mysql
    return {"response" : "OK"}

I can request output from the API endpoint and its working fine for me perfectly.

Now, the biggest challenge for me to know how much time it is taking for each iteration. Because in the UI part (those who are accessing my API endpoint) I want to help them show a progress bar (TIME TAKEN) for each iteration/file being processed.

Is there any possible way for me to achieve it? If so, please help me out on how can I proceed further?

Thank you.

like image 511
user_12 Avatar asked Nov 18 '20 21:11

user_12


People also ask

How do I run a background task on FastAPI?

FastAPI will automatically create an instance of BackgroundTasks and make it available to our request handler method. Within the request handler function, we use add_task() function to add a background task. The add_task() function receives the task function (write_email_log_file) as one of the inputs.

What does async do in FastAPI?

Asynchronous Code. Asynchronous code just means that the language 💬 has a way to tell the computer / program 🤖 that at some point in the code, it 🤖 will have to wait for something else to finish somewhere else.

How many requests can FastAPI handle?

FastAPI (Sync) - Python FastAPI in synchronous mode clocks in at ~178 requests per second.


1 Answers

Approaches

Polling

The most preferred approach to track the progress of a task is polling:

  1. After receiving a request to start a task on a backend:
    1. Create a task object in the storage (e.g in-memory, redis and etc.). The task object must contain the following data: task ID, status (pending, completed), result, and others.
    2. Run task in the background (coroutines, threading, multiprocessing, task queue like Celery, arq, aio-pika, dramatiq and etc.)
    3. Response immediately the answer 202 (Accepted) by returning the previously received task ID.
  2. Update task status:
    1. This can be from within the task itself, if it knows about the task store and has access to it. Periodically, the task itself updates information about itself.
    2. Or use a task monitor (Observer, producer-consumer pattern), which will monitor the status of the task and its result. And it will also update the information in the storage.
  3. On the client side (front-end) start a polling cycle for the task status to endpoint /task/{ID}/status, which takes information from the task storage.

Streaming response

Streaming is a less convenient way of getting the status of request processing periodically. When we gradually push responses without closing the connection. It has a number of significant disadvantages, for example, if the connection is broken, you can lose information. Streaming Api is another approach than REST Api.

Websockets

You can also use websockets for real-time notifications and bidirectional communication.

Links:

  • Examples of polling approach for the progress bar and a more detailed description for django + celery can be found at these links:

https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html

https://buildwithdjango.com/blog/post/celery-progress-bars/

  • I have provided simplified examples of running background tasks in FastAPI using multiprocessing here:

https://stackoverflow.com/a/63171013/13782669

Old answer:

You could run a task in the background, return its id and provide a /status endpoint that the front would periodically call. In the status response, you could return what state your task is now (for example, pending with the number of the currently processed file). I provided a few simple examples here.

Demo

Polling

Demo of the approach using asyncio tasks (single worker solution):

import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    progress: int = 0
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}  # Dict as job storage


async def long_task(queue: asyncio.Queue, param: int):
    for i in range(1, param):  # do work and return our progress
        await asyncio.sleep(1)
        await queue.put(i)
    await queue.put(None)


async def start_new_task(uid: UUID, param: int) -> None:

    queue = asyncio.Queue()
    task = asyncio.create_task(long_task(queue, param))

    while progress := await queue.get():  # monitor task progress
        jobs[uid].progress = progress

    jobs[uid].status = "complete"


@app.post("/new_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_new_task, new_task.uid, param)
    return new_task


@app.get("/task/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]

Adapted example for loop from question

Background processing function is defined as def and FastAPI runs it on the thread pool.

import time
from http import HTTPStatus

from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    processed_files: List[str] = Field(default_factory=list)


app = FastAPI()
jobs: Dict[UUID, Job] = {}


def process_files(task_id: UUID, files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        jobs[task_id].processed_files.append(i.filename)
    jobs[task_id].status = "completed"


@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(process_files, new_task.uid, files)
    return new_task


@app.get("/work/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]

Streaming

async def process_files_gen(files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        yield f"{i.filename} processed\n"
    yield f"OK\n"


@app.post('/work/stream/test', status_code=HTTPStatus.ACCEPTED)
async def work(files: List[UploadFile] = File(...)):
    return StreamingResponse(process_files_gen(files))
like image 151
alex_noname Avatar answered Sep 18 '22 23:09

alex_noname