Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to reuse aiohttp ClientSession pool?

The docs say to reuse the ClientSession:

Don’t create a session per request. Most likely you need a session per application which performs all requests altogether.

A session contains a connection pool inside, connection reusage and keep-alives (both are on by default) may speed up total performance.1

But there doesn't seem to be any explanation in the docs about how to do this? There is one example that's maybe relevant, but it does not show how to reuse the pool elsewhere: http://aiohttp.readthedocs.io/en/stable/client.html#keep-alive-connection-pooling-and-cookie-sharing

Would something like this be the correct way to do it?

@app.listener('before_server_start')
async def before_server_start(app, loop):
    app.pg_pool = await asyncpg.create_pool(**DB_CONFIG, loop=loop, max_size=100)
    app.http_session_pool = aiohttp.ClientSession()


@app.listener('after_server_stop')
async def after_server_stop(app, loop):
    app.http_session_pool.close()
    app.pg_pool.close()


@app.post("/api/register")
async def register(request):
    # json validation
    async with app.pg_pool.acquire() as pg:
        await pg.execute()  # create unactivated user in db
        async with app.http_session_pool as session:
            # TODO send activation email using SES API
            async with session.post('http://httpbin.org/post', data=b'data') as resp:
                print(resp.status)
                print(await resp.text())
        return HTTPResponse(status=204)
like image 435
davidtgq Avatar asked Oct 28 '17 15:10

davidtgq


People also ask

How do I pass headers in aiohttp?

If you need to add HTTP headers to a request, pass them in a dict to the headers parameter. await session. post(url, data='Привет, Мир! ')

What is session in aiohttp?

Usage. The library allows us to store user-specific data into a session object. The session object has a dict-like interface (operations like session[key] = value, value = session[key] etc. are present). Before processing the session in a web-handler, you have to register the session middleware in aiohttp.

How does aiohttp work?

Aiohttp is an HTTP server/client for asyncio. It allows users to create asynchronous servers and clients. Also, the aiohttp package works for Client WebSockets and Server WebSockets.

What is aiohttp in Python?

Python 3.5 added some new syntax that allows developers to create asynchronous applications and packages easier. One such package is aiohttp which is an HTTP client/server for asyncio. Basically, it allows you to write asynchronous clients and servers.


2 Answers

There're few things I think can be improved:

1)

Instance of ClientSession is one session object. This on session contains pool of connections, but it's not "session_pool" itself. I would suggest rename http_session_pool to http_session or may be client_session.

2)

Session's close() method is a corountine. Your should await it:

await app.client_session.close()

Or even better (IMHO), instead of thinking about how to properly open/close session use standard async context manager with awaiting of __aenter__ / __aexit__:

@app.listener('before_server_start')
async def before_server_start(app, loop):
    # ...
    app.client_session = await aiohttp.ClientSession().__aenter__()


@app.listener('after_server_stop')
async def after_server_stop(app, loop):
    await app.client_session.__aexit__(None, None, None)
    # ...

3)

Pay attention to this info:

However, if the event loop is stopped before the underlying connection is closed, an ResourceWarning: unclosed transport warning is emitted (when warnings are enabled).

To avoid this situation, a small delay must be added before closing the event loop to allow any open underlying connections to close.

I'm not sure it's mandatory in your case but there's nothing bad in adding await asyncio.sleep(0) inside after_server_stop as documentation advices:

@app.listener('after_server_stop')
async def after_server_stop(app, loop):
    # ...
    await asyncio.sleep(0)  # http://aiohttp.readthedocs.io/en/stable/client.html#graceful-shutdown

Upd:

Class that implements __aenter__ / __aexit__ can be used as async context manager (can be used in async with statement). It allows to do some actions before executing internal block and after it. This is very similar to regular context managers, but asyncio related. Same as regular context manager async one can be used directly (without async with) manually awaiting __aenter__ / __aexit__.

Why do I think it's better to create/free session using __aenter__ / __aexit__ manually instead of using close(), for example? Because we shouldn't worry what actually happens inside __aenter__ / __aexit__. Imagine in future versions of aiohttp creating of session will be changed with the need to await open() for example. If you'll use __aenter__ / __aexit__ you wouldn't need to somehow change your code.

like image 61
Mikhail Gerasimov Avatar answered Nov 18 '22 05:11

Mikhail Gerasimov


I found this question after searching on Google on how to reuse an aiohttp ClientSession instance after my code was triggering this warning message: UserWarning: Creating a client session outside of coroutine is a very dangerous idea

This code may not solve the above problem though it is related. I am new to asyncio and aiohttp, so this may not be best practice. It's the best I could come up with after reading a lot of seemingly conflicting information.

I created a class ResourceManager taken from the Python docs that opens a context.

The ResourceManager instance handles the opening and closing of the aiohttp ClientSession instance via the magic methods __aenter__ and __aexit__ with BaseScraper.set_session and BaseScraper.close_session wrapper methods.

I was able to reuse a ClientSession instance with the following code.

The BaseScraper class also has methods for authentication. It depends on the lxml third-party package.

import asyncio
from time import time
from contextlib import contextmanager, AbstractContextManager, ExitStack

import aiohttp
import lxml.html


class ResourceManager(AbstractContextManager):
    # Code taken from Python docs: 29.6.2.4. of https://docs.python.org/3.6/library/contextlib.html

    def __init__(self, scraper, check_resource_ok=None):
        self.acquire_resource = scraper.acquire_resource
        self.release_resource = scraper.release_resource
        if check_resource_ok is None:

            def check_resource_ok(resource):
                return True

        self.check_resource_ok = check_resource_ok

    @contextmanager
    def _cleanup_on_error(self):
        with ExitStack() as stack:
            stack.push(self)
            yield
            # The validation check passed and didn't raise an exception
            # Accordingly, we want to keep the resource, and pass it
            # back to our caller
            stack.pop_all()

    def __enter__(self):
        resource = self.acquire_resource()
        with self._cleanup_on_error():
            if not self.check_resource_ok(resource):
                msg = "Failed validation for {!r}"
                raise RuntimeError(msg.format(resource))
        return resource

    def __exit__(self, *exc_details):
        # We don't need to duplicate any of our resource release logic
        self.release_resource()


class BaseScraper:
    login_url = ""
    login_data = dict()  # dict of key, value pairs to fill the login form
    loop = asyncio.get_event_loop()

    def __init__(self, urls):
        self.urls = urls
        self.acquire_resource = self.set_session
        self.release_resource = self.close_session

    async def _set_session(self):
        self.session = await aiohttp.ClientSession().__aenter__()

    def set_session(self):
        set_session_attr = self.loop.create_task(self._set_session())
        self.loop.run_until_complete(set_session_attr)
        return self  # variable after "as" becomes instance of BaseScraper

    async def _close_session(self):
        await self.session.__aexit__(None, None, None)

    def close_session(self):
        close_session = self.loop.create_task(self._close_session())
        self.loop.run_until_complete(close_session)

    def __call__(self):
        fetch_urls = self.loop.create_task(self._fetch())
        return self.loop.run_until_complete(fetch_urls)

    async def _get(self, url):
        async with self.session.get(url) as response:
            result = await response.read()
        return url, result

    async def _fetch(self):
        tasks = (self.loop.create_task(self._get(url)) for url in self.urls)
        start = time()
        results = await asyncio.gather(*tasks)
        print(
            "time elapsed: {} seconds \nurls count: {}".format(
                time() - start, len(urls)
            )
        )
        return results

    @property
    def form(self):
        """Create and return form for authentication."""
        form = aiohttp.FormData(self.login_data)
        get_login_page = self.loop.create_task(self._get(self.login_url))
        url, login_page = self.loop.run_until_complete(get_login_page)

        login_html = lxml.html.fromstring(login_page)
        hidden_inputs = login_html.xpath(r'//form//input[@type="hidden"]')
        login_form = {x.attrib["name"]: x.attrib["value"] for x in hidden_inputs}
        for key, value in login_form.items():
            form.add_field(key, value)
        return form

    async def _login(self, form):
        async with self.session.post(self.login_url, data=form) as response:
            if response.status != 200:
                response.raise_for_status()
            print("logged into {}".format(url))
            await response.release()

    def login(self):
        post_login_form = self.loop.create_task(self._login(self.form))
        self.loop.run_until_complete(post_login_form)


if __name__ == "__main__":
    urls = ("http://example.com",) * 10
    base_scraper = BaseScraper(urls)
    with ResourceManager(base_scraper) as scraper:
        for url, html in scraper():
            print(url, len(html))
like image 27
dmmfll Avatar answered Nov 18 '22 03:11

dmmfll