Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Get return status from Background Tasks in FastAPI

I have an API which posts jobs upon which background jobs are created and I want to send status of job on another GET api. How to achieve this? In background_work() function I am going with multiprocessing as call internally targets subprocess.call() calls.

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def background_work(data: str):
    # some computation on data and return it
    return status

@app.post("/post_job", status_code=HTTP_201_CREATED)
async def send_notification(data: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(background_work, data)
    return {"message": "Job Created, check status after some time!"}

@app.get("/get_status")
def status():
    #how to return status of job submitted to background task

like image 621
user user12 Avatar asked May 16 '20 12:05

user user12


Video Answer


3 Answers

I'm using fastAPI exactly like this, combining concurrent.futures.ProcessPoolExecutor() and asyncio to manage long running jobs.

If you don't want to rely on other modules (celery etc), you need to manage yourself the state of your job, and store it somewhere. I store it in the DB so that pending jobs can be resumed after a restart of the server.

Note that you must NOT perform CPU intensive computations in the background_tasks of the app, because it runs in the same async event loop that serves the requests and it will stall your app. Instead submit them to a thread pool or a process pool.

like image 64
Loki Avatar answered Oct 11 '22 23:10

Loki


This currently cannot be achieved with FastAPI, since Background Tasks are just references to callables to be invoked after your Response is sent, they do not store any kind of status.

You will have to use Celery or another library.

like image 30
Gabriel Cappelli Avatar answered Oct 11 '22 21:10

Gabriel Cappelli


Try this pattern:

import time
from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

class TaskState:

    def __init__(self):
        self.counter = 0

    def background_work(self):
        while True:
            self.counter += 1
            time.sleep(1)

    def get_state(self):
        return self.counter

state = TaskState()

@app.post("/post_job", status_code=HTTP_201_CREATED)
async def send_notification(background_tasks: BackgroundTasks):
    background_tasks.add_task(state.background_work)
    return {"message": "Job Created, check status after some time!"}

@app.get("/get_status")
def status():
    return state.get_state()
like image 8
roi neu Avatar answered Oct 11 '22 22:10

roi neu