Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

aiohttp.TCPConnector (with limit argument) vs asyncio.Semaphore for limiting the number of concurrent connections

Tags:

I thought I'd like to learn the new python async await syntax and more specifically the asyncio module by making a simple script that allows you to download multiple resources at one.

But now I'm stuck.

While researching I came across two options to limit the number of concurrent requests:

  1. Passing a aiohttp.TCPConnector (with limit argument) to a aiohttp.ClientSession or
  2. Using a asyncio.Semaphore.

Is there a preferred option or can they be used interchangeably if all you want is to limit the number of concurrent connections? Are the (roughly) equal in terms of performance?

Also both seem to have a default value of 100 concurrent connections/operations. If I use only a Semaphore with a limit of lets say 500 will the aiohttp internals lock me down to 100 concurrent connections implicitly?

This is all very new and unclear to me. Please feel free to point out any misunderstandings on my part or flaws in my code.

Here is my code currently containing both options (which should I remove?):

Bonus Questions:

  1. How do I handle (preferably retry x times) coros that threw an error?
  2. What is the best way to save the returned data (inform my DataHandler) as soon as a coro is finished? I don't want it all to be saved at the end because I could start working with the results as soon as possible.

s

import asyncio
from tqdm import tqdm
import uvloop as uvloop
from aiohttp import ClientSession, TCPConnector, BasicAuth

# You can ignore this class
class DummyDataHandler(DataHandler):
    """Takes data and stores it somewhere"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def take(self, origin_url, data):
        return True

    def done(self):
        return None

class AsyncDownloader(object):
    def __init__(self, concurrent_connections=100, silent=False, data_handler=None, loop_policy=None):

        self.concurrent_connections = concurrent_connections
        self.silent = silent

        self.data_handler = data_handler or DummyDataHandler()

        self.sending_bar = None
        self.receiving_bar = None

        asyncio.set_event_loop_policy(loop_policy or uvloop.EventLoopPolicy())
        self.loop = asyncio.get_event_loop()
        self.semaphore = asyncio.Semaphore(concurrent_connections)

    async def fetch(self, session, url):
        # This is option 1: The semaphore, limiting the number of concurrent coros,
        # thereby limiting the number of concurrent requests.
        with (await self.semaphore):
            async with session.get(url) as response:
                # Bonus Question 1: What is the best way to retry a request that failed?
                resp_task = asyncio.ensure_future(response.read())
                self.sending_bar.update(1)
                resp = await resp_task

                await  response.release()
                if not self.silent:
                    self.receiving_bar.update(1)
                return resp

    async def batch_download(self, urls, auth=None):
        # This is option 2: Limiting the number of open connections directly via the TCPConnector
        conn = TCPConnector(limit=self.concurrent_connections, keepalive_timeout=60)
        async with ClientSession(connector=conn, auth=auth) as session:
            await asyncio.gather(*[asyncio.ensure_future(self.download_and_save(session, url)) for url in urls])

    async def download_and_save(self, session, url):
        content_task = asyncio.ensure_future(self.fetch(session, url))
        content = await content_task
        # Bonus Question 2: This is blocking, I know. Should this be wrapped in another coro
        # or should I use something like asyncio.as_completed in the download function?
        self.data_handler.take(origin_url=url, data=content)

    def download(self, urls, auth=None):
        if isinstance(auth, tuple):
            auth = BasicAuth(*auth)
        print('Running on concurrency level {}'.format(self.concurrent_connections))
        self.sending_bar = tqdm(urls, total=len(urls), desc='Sent    ', unit='requests')
        self.sending_bar.update(0)

        self.receiving_bar = tqdm(urls, total=len(urls), desc='Reveived', unit='requests')
        self.receiving_bar.update(0)

        tasks = self.batch_download(urls, auth)
        self.loop.run_until_complete(tasks)
        return self.data_handler.done()


### call like so ###

URL_PATTERN = 'https://www.example.com/{}.html'

def gen_url(lower=0, upper=None):
    for i in range(lower, upper):
        yield URL_PATTERN.format(i)   

ad = AsyncDownloader(concurrent_connections=30)
data = ad.download([g for g in gen_url(upper=1000)])
like image 957
LLC Avatar asked Aug 18 '17 13:08

LLC


People also ask

What is Asyncio semaphore?

From the asyncio docs: A semaphore manages an internal counter which is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; when acquire() finds that it is zero, it blocks, waiting until some task calls release() .

What does asyncio do?

asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.

How do I pass headers in Aiohttp?

If you need to add HTTP headers to a request, pass them in a dict to the headers parameter. await session. post(url, data='Привет, Мир! ')

What does async def mean?

The syntax async def introduces either a native coroutine or an asynchronous generator. The expressions async with and async for are also valid, and you'll see them later on. The keyword await passes function control back to the event loop. (It suspends the execution of the surrounding coroutine.)


1 Answers

Is there a preferred option?

Yes, see below:

will the aiohttp internals lock me down to 100 concurrent connections implicitly?

Yes, the default value of 100 will lock you down, unless you specify another limit. You can see it in the source here: https://github.com/aio-libs/aiohttp/blob/master/aiohttp/connector.py#L1084

Are they (roughly) equal in terms of performance?

No (but the difference in performance should be negligible), since aiohttp.TCPConnector checks for available connections anyway, wether or not it is surrounded by a Semaphore, using a Semaphore here would be just unnecessary overhead.

How do I handle (preferably retry x times) coros that threw an error?

I don't believe there is a standard way to do so, but one solution would be to wrap your calls in a method like this:

async def retry_requests(...):
    for i in range(5):
        try:
            return (await session.get(...)
        except aiohttp.ClientResponseError:
            pass
like image 167
iCart Avatar answered Oct 17 '22 08:10

iCart