Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

improving speed of "Sending 100,000 request" by asyncio using multi-process and multi-thread

First of all, I want to send multiple-request using 1 connection as fast as possible. The code below work fine and fast but I want it to go beyond asynchronous. Back to my question, is it possible to run this in parallel using multi-threading or multi-processing. I heard that you could use ThreadPoolExecutor or ProcessPoolExecutor.

import random
import asyncio
from aiohttp import ClientSession
import time
from concurrent.futures import ProcessPoolExecutor

async def fetch(sem,url, session):
    async with sem:
        async with session.get(url) as response:
            return await response.read()
async def run(r):
    url = "http://www.example.com/"
    tasks = []
    sem = asyncio.Semaphore(1000)
    async with ClientSession() as session:
        for i in range(r):
            task = asyncio.ensure_future(fetch(sem, url.format(i), session)) #return a task
            tasks.append(task)
        responses = asyncio.gather(*tasks)
        await responses
if __name__ == "__main__":
    number = 10000
    loop = asyncio.get_event_loop()
    start = time.time()
    loop.run_until_complete(run(number))
    end = time.time() - start
    print (end)

from testing, it managed to sent roughly 10k request in 49sec. I need it to be faster, any suggestion? (thread, process)

like image 539
Madwolf Avatar asked Nov 08 '22 06:11

Madwolf


1 Answers

The ProcessPoolExecutor is a way to do real multi-processing. For your use-case, it's basically just as if you were launching multiple copies of your program at the same time. If you have the bandwidth and CPU needed on your machine, you should be able to improve the performance by 4 by using a ProcessPoolExecutor(max_workers=4)

You will however need a asyncio event loop in each of the subprocess, so you can do something like this:

def main(n):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(n))


with concurrent.futures.ProcessPoolExecutor(max_workers=4) as exc:
    exc.submit(main, 2500)
    exc.submit(main, 2500)
    exc.submit(main, 2500)
    exc.submit(main, 2500)

As a side note on your run function: There is also no need for you to use ensure_future or Tasks, the result of an async def function is a coroutine, which you directly await or pass to asyncio.gather

async def run(r):
    url = "http://www.example.com/"
    sem = asyncio.Semaphore(1000)
    async with ClientSession() as session:
        coros = [fetch(sem, url.format(i), session) for i in range(r)]
        await asyncio.gather(*coros)
like image 199
Arthur Avatar answered Nov 14 '22 21:11

Arthur