First of all, I want to send multiple-request using 1 connection as fast as possible. The code below work fine and fast but I want it to go beyond asynchronous. Back to my question, is it possible to run this in parallel using multi-threading or multi-processing. I heard that you could use ThreadPoolExecutor or ProcessPoolExecutor.
import random
import asyncio
from aiohttp import ClientSession
import time
from concurrent.futures import ProcessPoolExecutor
async def fetch(sem,url, session):
async with sem:
async with session.get(url) as response:
return await response.read()
async def run(r):
url = "http://www.example.com/"
tasks = []
sem = asyncio.Semaphore(1000)
async with ClientSession() as session:
for i in range(r):
task = asyncio.ensure_future(fetch(sem, url.format(i), session)) #return a task
tasks.append(task)
responses = asyncio.gather(*tasks)
await responses
if __name__ == "__main__":
number = 10000
loop = asyncio.get_event_loop()
start = time.time()
loop.run_until_complete(run(number))
end = time.time() - start
print (end)
from testing, it managed to sent roughly 10k request in 49sec. I need it to be faster, any suggestion? (thread, process)
The ProcessPoolExecutor is a way to do real multi-processing. For your use-case, it's basically just as if you were launching multiple copies of your program at the same time. If you have the bandwidth and CPU needed on your machine, you should be able to improve the performance by 4 by using a ProcessPoolExecutor(max_workers=4)
You will however need a asyncio event loop in each of the subprocess, so you can do something like this:
def main(n):
loop = asyncio.get_event_loop()
loop.run_until_complete(run(n))
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as exc:
exc.submit(main, 2500)
exc.submit(main, 2500)
exc.submit(main, 2500)
exc.submit(main, 2500)
As a side note on your run
function: There is also no need for you to use ensure_future
or Tasks, the result of an async def
function is a coroutine, which you directly await or pass to asyncio.gather
async def run(r):
url = "http://www.example.com/"
sem = asyncio.Semaphore(1000)
async with ClientSession() as session:
coros = [fetch(sem, url.format(i), session) for i in range(r)]
await asyncio.gather(*coros)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With