Not so long ago, I began to learn asyncio. And I ran into a problem. My code is not terminating. I can't figure it out. Help me please!
import signal
import sys
import asyncio
import aiohttp
import json
loop = asyncio.get_event_loop()
client = aiohttp.ClientSession(loop=loop)
async def get_json(client, url):
async with client.get(url) as response:
assert response.status == 200
return await response.read()
async def get_reddit_cont(subreddit, client):
data1 = await get_json(client, 'https://www.reddit.com/r/' + subreddit + '/top.json?sort=top&t=day&limit=50')
jn = json.loads(data1.decode('utf-8'))
print('DONE:', subreddit)
def signal_handler(signal, frame):
loop.stop()
client.close()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
for key in {'python':1, 'programming':2, 'compsci':3}:
asyncio.ensure_future(get_reddit_cont(key, client))
loop.run_forever()
Result:
DONE: compsci
DONE: programming
DONE: python
...
I tried to accomplish something, but the result was not stable.
future = []
for key in {'python':1, 'programming':2, 'compsci':3}:
future=asyncio.ensure_future(get_reddit_cont(key, client))
loop.run_until_complete(future)
Result (1 tasks instead of 3):
DONE: compsci
[Finished in 1.5s]
I solved my question in this way:
Added by:
async with aiohttp.ClientSession () as a client:
AT:
async def get_reddit_cont (subreddit, client):
And:
if __name__ == '__main__':
loop = asyncio.get_event_loop()
futures = [get_reddit_cont(subreddit,client) for subreddit in range(1,6)]
result = loop.run_until_complete(asyncio.gather(*futures))
But when the code is completed, I get the message:
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x034021F0>
[Finished in 1.0s]
I don't understand why this is happening.
But when I try to execute "for key" about 60 or more times, I get an error:
...
aiohttp.client_exceptions.ClientOSError: [WinError 10054] Remote host forcibly terminated an existing connection
Here are a few suggested changes, with context in the comments.
Unless you really have a unique use-case, or are just experimenting for learning's sake, there probably shouldn't be a reason to use signal
-- asyncio
has top-level functions that let you decide when to close and terminate the event loop.
import asyncio
import logging
import sys
import aiohttp
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s:%(message)s')
URL = 'https://www.reddit.com/r/{subreddit}/top.json?sort=top&t=day&limit=50'
async def get_json(client: aiohttp.ClientSession, url: str) -> dict:
# If you're going to be making repeated requests, use this
# over .get(), which is just a wrapper around `.request()` and
# involves an unneeded lookup
async with client.request('GET', url) as response:
# Raise if the response code is >= 400.
# Some 200 codes may still be "ok".
# You can also pass raise_for_status within
# client.request().
response.raise_for_status()
# Let your code be fully async. The call to json.loads()
# is blocking and won't take full advantage.
#
# And it does largely the same thing you're doing now:
# https://github.com/aio-libs/aiohttp/blob/76268e31630bb8615999ec40984706745f7f82d1/aiohttp/client_reqrep.py#L985
j = await response.json()
logging.info('DONE: got %s, size %s', url, j.__sizeof__())
return j
async def get_reddit_cont(keys, **kwargs) -> list:
async with aiohttp.ClientSession(**kwargs) as session:
# Use a single session as a context manager.
# this enables connection pooling, which matters a lot when
# you're only talking to one site
tasks = []
for key in keys:
# create_task: Python 3.7+
task = asyncio.create_task(
get_json(session, URL.format(subreddit=key)))
tasks.append(task)
# The result of this will be a list of dictionaries
# It will only return when all of your subreddits
# have given you a response & been decoded
#
# To process greedily, use asyncio.as_completed()
return await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == '__main__':
default = ('python', 'programming', 'compsci')
keys = sys.argv[1:] if len(sys.argv) > 1 else default
sys.exit(asyncio.run(get_reddit_cont(keys=keys)))
Output:
$ python3 asyncreddit.py
2018-11-07 21:44:49,495:Using selector: KqueueSelector
2018-11-07 21:44:49,653:DONE: got https://www.reddit.com/r/compsci/top.json?sort=top&t=day&limit=50, size 216
2018-11-07 21:44:49,713:DONE: got https://www.reddit.com/r/python/top.json?sort=top&t=day&limit=50, size 216
2018-11-07 21:44:49,947:DONE: got https://www.reddit.com/r/programming/top.json?sort=top&t=day&limit=50, size 216
Edit: from your question:
But when the code is completed, I get the message:
Unclosed client session
This is because you need to .close()
the client
object, just as you would a file object. You can do that two ways:
client.close()
. It is safer to wrap this in a try
/finally
block to make sure that it's closed no matter whatasync with
block is over, the session is automatically closed via its .__aexit__()
method.The connector
is the underlying TCPConnector
, which is an attribute of the session. It handles the connection pooling, and it's what ultimately is left open in your code.
The answer lies in your code. Here's the clue loop.run_forever()
. So you will need to call loop.stop()
. I would use a condition such as an if
clause or using a while
loop.
if we_have_what_we_need:
signal_handler(signal, frame)
or
while we_dont_have_what_we_need:
loop.forever()
The first will stop your code when the condition is met. The latter will keep going until the condition is met.
[UPDATE]
We can also use;
loop.run_until_complete(future)
Run until the future (an instance of Future) has completed.
If the argument is a coroutine object it is implicitly scheduled to run as a asyncio.Task.
Return the Future’s result or raise its exception.
loop.run_forever()
Run the event loop until stop() is called.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With