In the FastAPI web framework, why is httpx so much worse than aiohttp when facing high concurrent requests? They both reuse the same client instance.
test code
import uvicorn
from fastapi import FastAPI
import requests
import httpx
import aiohttp
app = FastAPI(description="sync and async route func test")
aio_client = httpx.AsyncClient()
aio_session: aiohttp.ClientSession = None
req_session = requests.Session()
@app.on_event("startup")
async def startup_event():
    global aio_session
    aio_session = aiohttp.ClientSession()
async def async_get(url):
    # async with aio_session.get(url) as resp:
    #     return resp
    resp = await aio_client.get(url)
    return resp
@app.get("/ping")
async def ping():
    return "pong"
@app.get("/async_route_func_demo")
async def async_route_func():
    url = "https://juejin.cn/"
    resp = await async_get(url)
    # print("resp", resp)
    return "async_route_func_demo"
@app.get("/async_route_use_sync_io_demo")
async def async_route_func():
    url = "https://juejin.cn/"
    resp = req_session.get(url)
    # print("resp", resp)
    return "async_route_func_demo"
@app.get("/sync_route_func_demo")
def sync_route_func():
    url = "https://juejin.cn/"
    resp = req_session.get(url)
    # print("resp", resp)
    return "sync_route_func_demo"
def main():
    uvicorn.run(app, log_level="warning")
if __name__ == '__main__':
    main()
python version: Python 3.10.12
operating system: mac os m1 pro
requirements
fastapi==0.101.0
uvicorn==0.23.2
requests==2.32.0
httpx==0.27.0
wrk test result
When I was testing the synchronous routing and asynchronous routing of fastapi, I found that the performance of httpx was not ideal. I don’t know if there is something wrong with my coding method. Could you please help me check it out.
If you also find httpx particularly slow (and it’s already heavily used in your codebase, making migration difficult), and you ended up here after Googling, here’s a simple conversion path:
class AiohttpTransport(httpx.AsyncBaseTransport):
    def __init__(self, session: typing.Optional[aiohttp.ClientSession] = None):
        self._session = session or aiohttp.ClientSession()
        self._closed = False
    async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
        if self._closed:
            raise RuntimeError("Transport is closed")
        # 转换headers
        headers = dict(request.headers)
        
        # 准备请求参数
        method = request.method
        url = str(request.url)
        content = request.content
        
        async with self._session.request(
            method=method,
            url=url,
            headers=headers,
            data=content,
            allow_redirects=False,
        ) as aiohttp_response:
            # 读取响应内容
            content = await aiohttp_response.read()
            
            # 转换headers
            headers = [(k.lower(), v) for k, v in aiohttp_response.headers.items()]
            
            # 构建httpx.Response
            return httpx.Response(
                status_code=aiohttp_response.status,
                headers=headers,
                content=content,
                request=request
            )
    async def aclose(self):
        if not self._closed:
            self._closed = True
            await self._session.close()
Generally speaking, I found httpx slow in processing many concurrent requests. Here is my benchmark script and results:
import asyncio
import typing
import time
import aiohttp
from aiohttp import ClientSession
import httpx
from concurrent.futures import ProcessPoolExecutor
import statistics
ADDRESS = "https://www.baidu.com"
async def request_with_aiohttp(session):
    async with session.get(ADDRESS) as rsp:
        return await rsp.text()
async def request_with_httpx(client):
    rsp = await client.get(ADDRESS)
    return rsp.text
# 性能测试函数
async def benchmark_aiohttp(n):
    async with ClientSession() as session:
        # make sure code is right
        print(await request_with_aiohttp(session))
        start = time.time()
        tasks = []
        for i in range(n):
            tasks.append(request_with_aiohttp(session))
        await asyncio.gather(*tasks)
        return time.time() - start
async def benchmark_httpx(n):
    async with httpx.AsyncClient(
        timeout=httpx.Timeout(
            timeout=10,
        ),
    ) as client:
        # make sure code is right
        print(await request_with_httpx(client))
        start = time.time()
        tasks = []
        for i in range(n):
            tasks.append(request_with_httpx(client))
        await asyncio.gather(*tasks)
        return time.time() - start
    
class AiohttpTransport(httpx.AsyncBaseTransport):
    def __init__(self, session: typing.Optional[aiohttp.ClientSession] = None):
        self._session = session or aiohttp.ClientSession()
        self._closed = False
    async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
        if self._closed:
            raise RuntimeError("Transport is closed")
        # 转换headers
        headers = dict(request.headers)
        
        # 准备请求参数
        method = request.method
        url = str(request.url)
        content = request.content
        
        async with self._session.request(
            method=method,
            url=url,
            headers=headers,
            data=content,
            allow_redirects=False,
        ) as aiohttp_response:
            # 读取响应内容
            content = await aiohttp_response.read()
            
            # 转换headers
            headers = [(k.lower(), v) for k, v in aiohttp_response.headers.items()]
            
            # 构建httpx.Response
            return httpx.Response(
                status_code=aiohttp_response.status,
                headers=headers,
                content=content,
                request=request
            )
    async def aclose(self):
        if not self._closed:
            self._closed = True
            await self._session.close()
async def benchmark_httpx_with_aiohttp_transport(n):
    async with httpx.AsyncClient(
        timeout=httpx.Timeout(
            timeout=10,
        ),
        transport=AiohttpTransport(),
    ) as client:
        start = time.time()
        tasks = []
        for i in range(n):
            tasks.append(request_with_httpx(client))
        await asyncio.gather(*tasks)
        return time.time() - start
    
async def run_benchmark(requests=1000, rounds=3):
    aiohttp_times = []
    httpx_times = []
    httpx_aio_times = []
    
    print(f"开始测试 {requests} 并发请求...")
    
    for i in range(rounds):
        print(f"\n第 {i+1} 轮测试:")
        
        # aiohttp 测试
        aiohttp_time = await benchmark_aiohttp(requests)
        aiohttp_times.append(aiohttp_time)
        print(f"aiohttp 耗时: {aiohttp_time:.2f} 秒")
        
        # 短暂暂停让系统冷却
        await asyncio.sleep(1)
        
        # httpx 测试
        httpx_time = await benchmark_httpx(requests)
        httpx_times.append(httpx_time)
        print(f"httpx 耗时: {httpx_time:.2f} 秒")
        # 短暂暂停让系统冷却
        await asyncio.sleep(1)
        
        # httpx 测试
        httpx_time = await benchmark_httpx_with_aiohttp_transport(requests)
        httpx_aio_times.append(httpx_time)
        print(f"httpx (aiohttp transport) 耗时: {httpx_time:.2f} 秒")
    
    print("\n测试结果汇总:")
    print(f"aiohttp 平均耗时: {statistics.mean(aiohttp_times):.2f} 秒")
    print(f"httpx 平均耗时: {statistics.mean(httpx_times):.2f} 秒")
    print(f"httpx 平均耗时: {statistics.mean(httpx_aio_times):.2f} 秒")
if __name__ == '__main__':
    # 运行基准测试
    asyncio.run(run_benchmark(512))
Results:
I retested httpx and aiohttp and they are still quite different.You can pay attention to this issue of httpx.https://github.com/encode/httpx/issues/3215
import uvicorn
from fastapi import FastAPI
import requests
import httpx
import aiohttp
app = FastAPI(description="同步异步路由函数")
aio_client = httpx.AsyncClient()
aio_session: aiohttp.ClientSession = None
req_session = requests.Session()
@app.on_event("startup")
async def startup_event():
    global aio_session
    aio_session = aiohttp.ClientSession()
async def async_httpx_get(url):
    resp = await aio_client.get(url)
    return resp
async def async_aiohttp_get(url):
    async with aio_session.get(url) as resp:
        return await resp.text()
@app.get("/ping")
async def ping():
    return "pong"
@app.get("/async_httpx_get")
async def async_route_func():
    url = "https://juejin.cn/"
    resp = await async_httpx_get(url)
    return resp.text
@app.get("/async_aiohttp_get")
async def async_route_func():
    url = "https://juejin.cn/"
    resp_text = await async_aiohttp_get(url)
    return resp_text
@app.get("/async_route_use_sync_io_demo")
async def async_route_func():
    url = "https://juejin.cn/"
    resp = req_session.get(url)
    return resp.text
@app.get("/sync_route_func_demo")
def sync_route_func():
    url = "https://juejin.cn/"
    resp = req_session.get(url)
    return resp.text
def main():
    uvicorn.run(app, log_level="warning")
if __name__ == '__main__':
    main()
wrk test ret

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With