Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Get JSON using Python and AsyncIO

Not so long ago, I began to learn asyncio. And I ran into a problem. My code is not terminating. I can't figure it out. Help me please!

import signal
import sys
import asyncio
import aiohttp
import json

loop = asyncio.get_event_loop()
client = aiohttp.ClientSession(loop=loop)

async def get_json(client, url):
    async with client.get(url) as response:
        assert response.status == 200
        return await response.read()

async def get_reddit_cont(subreddit, client):
    data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=50')

    jn = json.loads(data1.decode('utf-8'))

    print('DONE:', subreddit)

def signal_handler(signal, frame):
    loop.stop()
    client.close()
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

for key in {'python':1, 'programming':2, 'compsci':3}:
    asyncio.ensure_future(get_reddit_cont(key, client))
loop.run_forever()

Result:

DONE: compsci  
DONE: programming  
DONE: python  
...

I tried to accomplish something, but the result was not stable.

future = []
for key in {'python':1, 'programming':2, 'compsci':3}:
    future=asyncio.ensure_future(get_reddit_cont(key, client))
loop.run_until_complete(future)

Result (1 tasks instead of 3):

DONE: compsci  
[Finished in 1.5s]  

I solved my question in this way:

Added by:

async with aiohttp.ClientSession () as a client:

AT:

async def get_reddit_cont (subreddit, client):  

And:

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    futures = [get_reddit_cont(subreddit,client) for subreddit in range(1,6)]
    result = loop.run_until_complete(asyncio.gather(*futures))

But when the code is completed, I get the message:

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x034021F0>
[Finished in 1.0s]

I don't understand why this is happening.

But when I try to execute "for key" about 60 or more times, I get an error:

...
aiohttp.client_exceptions.ClientOSError: [WinError 10054] Remote host forcibly terminated an existing connection

like image 677
idrees Avatar asked Nov 07 '18 23:11

idrees


Video Answer


2 Answers

Here are a few suggested changes, with context in the comments.

Unless you really have a unique use-case, or are just experimenting for learning's sake, there probably shouldn't be a reason to use signal -- asyncio has top-level functions that let you decide when to close and terminate the event loop.

import asyncio
import logging
import sys

import aiohttp

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
                    format='%(asctime)s:%(message)s')

URL = 'https://www.reddit.com/r/{subreddit}/top.json?sort=top&t=day&limit=50'


async def get_json(client: aiohttp.ClientSession, url: str) -> dict:
    # If you're going to be making repeated requests, use this
    # over .get(), which is just a wrapper around `.request()` and
    # involves an unneeded lookup
    async with client.request('GET', url) as response:

        # Raise if the response code is >= 400.
        # Some 200 codes may still be "ok".
        # You can also pass raise_for_status within
        # client.request().
        response.raise_for_status()

        # Let your code be fully async.  The call to json.loads()
        # is blocking and won't take full advantage.
        #
        # And it does largely the same thing you're doing now:
        # https://github.com/aio-libs/aiohttp/blob/76268e31630bb8615999ec40984706745f7f82d1/aiohttp/client_reqrep.py#L985
        j = await response.json()
        logging.info('DONE: got %s, size %s', url, j.__sizeof__())
        return j


async def get_reddit_cont(keys, **kwargs) -> list:
    async with aiohttp.ClientSession(**kwargs) as session:
        # Use a single session as a context manager.
        # this enables connection pooling, which matters a lot when
        # you're only talking to one site
        tasks = []
        for key in keys:
            # create_task: Python 3.7+
            task = asyncio.create_task(
                get_json(session, URL.format(subreddit=key)))
            tasks.append(task)
        # The result of this will be a list of dictionaries
        # It will only return when all of your subreddits
        # have given you a response & been decoded
        #
        # To process greedily, use asyncio.as_completed()
        return await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == '__main__':
    default = ('python', 'programming', 'compsci')
    keys = sys.argv[1:] if len(sys.argv) > 1 else default
    sys.exit(asyncio.run(get_reddit_cont(keys=keys)))

Output:

$ python3 asyncreddit.py 
2018-11-07 21:44:49,495:Using selector: KqueueSelector
2018-11-07 21:44:49,653:DONE: got https://www.reddit.com/r/compsci/top.json?sort=top&t=day&limit=50, size 216
2018-11-07 21:44:49,713:DONE: got https://www.reddit.com/r/python/top.json?sort=top&t=day&limit=50, size 216
2018-11-07 21:44:49,947:DONE: got https://www.reddit.com/r/programming/top.json?sort=top&t=day&limit=50, size 216

Edit: from your question:

But when the code is completed, I get the message: Unclosed client session

This is because you need to .close() the client object, just as you would a file object. You can do that two ways:

  • Call it explicitly: client.close(). It is safer to wrap this in a try/finally block to make sure that it's closed no matter what
  • Or (easier way), use the client as an async context manager, as in this answer. This means that, after the async with block is over, the session is automatically closed via its .__aexit__() method.

The connector is the underlying TCPConnector, which is an attribute of the session. It handles the connection pooling, and it's what ultimately is left open in your code.

like image 184
Brad Solomon Avatar answered Oct 27 '22 09:10

Brad Solomon


The answer lies in your code. Here's the clue loop.run_forever(). So you will need to call loop.stop(). I would use a condition such as an if clause or using a while loop.

if we_have_what_we_need:
    signal_handler(signal, frame)

or

while we_dont_have_what_we_need:
    loop.forever()

The first will stop your code when the condition is met. The latter will keep going until the condition is met.

[UPDATE]

We can also use;

(Python Docs)

loop.run_until_complete(future)

Run until the future (an instance of Future) has completed.

If the argument is a coroutine object it is implicitly scheduled to run as a asyncio.Task.

Return the Future’s result or raise its exception.

loop.run_forever()

Run the event loop until stop() is called.

like image 2
Jamie Lindsey Avatar answered Oct 27 '22 08:10

Jamie Lindsey