I have an API which posts jobs upon which background jobs are created and I want to send status of job on another GET api. How to achieve this? In background_work()
function I am going with multiprocessing as call internally targets subprocess.call()
calls.
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def background_work(data: str):
# some computation on data and return it
return status
@app.post("/post_job", status_code=HTTP_201_CREATED)
async def send_notification(data: str, background_tasks: BackgroundTasks):
background_tasks.add_task(background_work, data)
return {"message": "Job Created, check status after some time!"}
@app.get("/get_status")
def status():
#how to return status of job submitted to background task
I'm using fastAPI exactly like this, combining concurrent.futures.ProcessPoolExecutor()
and asyncio to manage long running jobs.
If you don't want to rely on other modules (celery etc), you need to manage yourself the state of your job, and store it somewhere. I store it in the DB so that pending jobs can be resumed after a restart of the server.
Note that you must NOT perform CPU intensive computations in the background_tasks
of the app, because it runs in the same async event loop that serves the requests and it will stall your app. Instead submit them to a thread pool or a process pool.
This currently cannot be achieved with FastAPI, since Background Tasks are just references to callables to be invoked after your Response is sent, they do not store any kind of status.
You will have to use Celery or another library.
Try this pattern:
import time
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
class TaskState:
def __init__(self):
self.counter = 0
def background_work(self):
while True:
self.counter += 1
time.sleep(1)
def get_state(self):
return self.counter
state = TaskState()
@app.post("/post_job", status_code=HTTP_201_CREATED)
async def send_notification(background_tasks: BackgroundTasks):
background_tasks.add_task(state.background_work)
return {"message": "Job Created, check status after some time!"}
@app.get("/get_status")
def status():
return state.get_state()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With