Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to do multiprocessing in FastAPI

Tags:

While serving a FastAPI request, I have a CPU-bound task to do on every element of a list. I'd like to do this processing on multiple CPU cores.

What's the proper way to do this within FastAPI? Can I use the standard multiprocessing module? All the tutorials/questions I found so far only cover I/O-bound tasks like web requests.

like image 228
CryingSofa Avatar asked Jul 30 '20 09:07

CryingSofa


People also ask

Does FastAPI use multiprocessing?

With FastAPI you can take the advantage of concurrency that is very common for web development (the same main attractive of NodeJS). But you can also exploit the benefits of parallelism and multiprocessing (having multiple processes running in parallel) for CPU bound workloads like those in Machine Learning systems.

Can FastAPI handle multiple requests?

FastAPI allows any type of WSGI application like Flask to be mounted inside a FastAPI application. What this means is that on the root level, you can have the main FastAPI app and further, for different routes, you can have WSGI applications such as Flask which will handle all requests for that specific path.

Is FastAPI production-ready?

While an open-source framework, FastAPI is fully production-ready, with excellent documentation, support, and an easy-to-use interface.

How many requests can FastAPI handle?

FastAPI (Async) - Python FastAPI in asynchronous mode clocks in at ~228 requests per second.


1 Answers

async def endpoint

You could use loop.run_in_executor with ProcessPoolExecutor to start function at a separate process.

@app.post("/async-endpoint") async def test_endpoint():     loop = asyncio.get_event_loop()     with concurrent.futures.ProcessPoolExecutor() as pool:         result = await loop.run_in_executor(pool, cpu_bound_func)  # wait result 

def endpoint

Since def endpoints are run implicitly in a separate thread, you can use the full power of modules multiprocessing and concurrent.futures. Note that inside def function, await may not be used. Samples:

@app.post("/def-endpoint") def test_endpoint():     ...     with multiprocessing.Pool(3) as p:         result = p.map(f, [1, 2, 3]) 
@app.post("/def-endpoint/") def test_endpoint():     ...     with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:       results = executor.map(f, [1, 2, 3]) 

Note: It should be remembered that creating a pool of processes in an endpoint, as well as creating a large number of threads, can lead to a slowdown in response as the number of requests increases.


Executing on the fly

The easiest and most native way to execute a function in a separate process and immediately wait for the results is to use the loop.run_in_executor with ProcessPoolExecutor.

A pool, as in the example below, can be created when the application starts and do not forget to shutdown on application exit. The number of processes used in the pool can be set using the max_workers ProcessPoolExecutor constructor parameter. If max_workers is None or not given, it will default to the number of processors on the machine.

The disadvantage of this approach is that the request handler (path operation) waits for the computation to complete in a separate process, while the client connection remains open. And if for some reason the connection is lost, then the results will have nowhere to return.

import asyncio from concurrent.futures.process import ProcessPoolExecutor from fastapi import FastAPI  from calc import cpu_bound_func  app = FastAPI()   async def run_in_process(fn, *args):     loop = asyncio.get_event_loop()     return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result   @app.get("/{param}") async def handler(param: int):     res = await run_in_process(cpu_bound_func, param)     return {"result": res}   @app.on_event("startup") async def on_startup():     app.state.executor = ProcessPoolExecutor()   @app.on_event("shutdown") async def on_shutdown():     app.state.executor.shutdown() 

Move to background

Usually, CPU bound tasks are executed in the background. FastAPI offers the ability to run background tasks to be run after returning a response, inside which you can start and asynchronously wait for the result of your CPU bound task.

In this case, for example, you can immediately return a response of "Accepted" (HTTP code 202) and a unique task ID, continue calculations in the background, and the client can later request the status of the task using this ID.

BackgroundTasks provide some features, in particular, you can run several of them (including in dependencies). And in them you can use the resources obtained in the dependencies, which will be cleaned only when all tasks are completed, while in case of exceptions it will be possible to handle them correctly. This can be seen more clearly in this diagram.

Below is an example that performs minimal task tracking. One instance of the application running is assumed.

import asyncio from concurrent.futures.process import ProcessPoolExecutor from http import HTTPStatus  from fastapi import BackgroundTasks from typing import Dict from uuid import UUID, uuid4 from fastapi import FastAPI from pydantic import BaseModel, Field  from calc import cpu_bound_func   class Job(BaseModel):     uid: UUID = Field(default_factory=uuid4)     status: str = "in_progress"     result: int = None   app = FastAPI() jobs: Dict[UUID, Job] = {}   async def run_in_process(fn, *args):     loop = asyncio.get_event_loop()     return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result   async def start_cpu_bound_task(uid: UUID, param: int) -> None:     jobs[uid].result = await run_in_process(cpu_bound_func, param)     jobs[uid].status = "complete"   @app.post("/new_cpu_bound_task/{param}", status_code=HTTPStatus.ACCEPTED) async def task_handler(param: int, background_tasks: BackgroundTasks):     new_task = Job()     jobs[new_task.uid] = new_task     background_tasks.add_task(start_cpu_bound_task, new_task.uid, param)     return new_task   @app.get("/status/{uid}") async def status_handler(uid: UUID):     return jobs[uid]   @app.on_event("startup") async def startup_event():     app.state.executor = ProcessPoolExecutor()   @app.on_event("shutdown") async def on_shutdown():     app.state.executor.shutdown() 

More powerful solutions

All of the above examples were pretty simple, but if you need some more powerful system for heavy distributed computing, then you can look aside message brokers RabbitMQ, Kafka, NATS and etc. And libraries using them like Celery.

like image 166
alex_noname Avatar answered Oct 12 '22 19:10

alex_noname