I'm trying to log into a website simultaneously using multiple credentials with aiohttp
and asyncio
. In the create_tasks
function, I generate a list of sessions to be used for each. The reason I cannot just create a sesssion within the login
function is because the same session object will be used throughout the code. What I'm trying to do is devise a way that I can use a context manager to handle the closing of the session (to avoid the runtime errors of leaving it open).
The following code works as intended (concurrent gathering of the login page and parsing of the token in a process pool), but it generates sessions separately from the tasks and requires me to close them at the end.
from bs4 import BeautifulSoup
from concurrent.futures import ProcessPoolExecutor
import aiohttp
import asyncio
#TODO: make this safe, handle exceptions
LOGIN_URL = "http://example.com/login"
CLIENT_CNT = 10
proc_pool = ProcessPoolExecutor(CLIENT_CNT)
def get_key(text):
soup = BeautifulSoup(text, "html.parser")
form = soup.find("form")
key = form.find("input", attrs={"type": "hidden", "name": "authenticityToken"})
return key.get("value", None)
async def login(username:str, password:str, session:aiohttp.ClientSession, sem:asyncio.BoundedSemaphore, loop:asyncio.AbstractEventLoop=None):
loop = loop or asyncio.get_event_loop()
async with sem:
async with session.get(LOGIN_URL) as resp:
x = await asyncio.ensure_future(loop.run_in_executor(proc_pool, get_key, await resp.text()))
print(x)
def create_tasks(usernames, passwords, sem:asyncio.BoundedSemaphore, loop:asyncio.AbstractEventLoop=None):
loop = loop or asyncio.get_event_loop()
tasks = []
sessions = []
for u, p in zip(usernames, passwords):
session = aiohttp.ClientSession(loop=loop)
sessions.append(session)
tasks.append(login(u, p, session, sem, loop))
return tasks, sessions
if __name__ == "__main__":
loop = asyncio.get_event_loop()
sem = asyncio.BoundedSemaphore(CLIENT_CNT)
usernames = ("a", "b", "c", "d", "e", "f", "g")
passwords = ("a", "b", "c", "d", "e", "f", "g")
tasks, sessions = create_tasks(usernames, passwords, sem, loop)
loop.run_until_complete(asyncio.gather(*tasks, loop=loop))
for session in sessions:
session.close()
I previously made create_tasks
a coroutine, wrote a wrapper class to make async iterables, and trying using
async with aiohttp.ClientSession() as session:
tasks.append(login(u, p, session, sem, loop)
But as I feared, it said that the session was already closed by the time it was run.
Here's a structure that makes reasoning easier:
async def user(u, p, ...):
"""Everything a single user does"""
auth = await login(u, p)
await download_something(auth, ...)
await post_something(auth, ...)
async def login(u, p): ...
async with aiohttp.ClientSession() as session:
async with session.get("http://xxx/login", ...) as r:
data = await r.json()
return data["something"]
async def download_xxx(...): ...
async def post_xxx(...): ...
async def everything():
creds = [("u1", "p1"), ...]
flows = [asyncio.ensure_future(user(cred)) for cred in creds]
for flow in flows:
await flow
Caveat programmator: aiohttp
by default appears to store cookies, make sure it doesn't cross-pollinate your user flows.
Bonus points for: correct use of asyncio.gather()
in the last async function.
Use ExitStack.
from contextlib import ExitStack
def create_tasks(..., context):
tasks = []
for username in usernames:
session = aiohttp.ClientSession()
tasks.append(...)
context.enter_context(session)
return tasks
if __name__ == "__main__":
context = ExitStack()
tasks = create_tasks(..., context)
with context:
loop.run_until_complete(asyncio.gather(*tasks))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With