Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to prevent raise asyncio.TimeoutError and continue the loop

I'm using aiohttp with limited_as_completed method to speed up scrapping (around 100 million static website pages). However, the code stops after several minutes, and returns the TimeoutError. I tried several things, but still could not prevent the raise asyncio.TimeoutError. May I ask how can I ignore the error, and continue?

The code I'm running is:

N=123
import html
from lxml import etree
import requests
import asyncio 
import aiohttp
from aiohttp import ClientSession, TCPConnector
import pandas as pd
import re 
import csv 
import time
from itertools import islice
import sys
from contextlib import suppress

start = time.time()
data = {}
data['name'] = []
filename = "C:\\Users\\xxxx"+ str(N) + ".csv"

def limited_as_completed(coros, limit):
    futures = [
        asyncio.ensure_future(c)
        for c in islice(coros, 0, limit)
    ]
    async def first_to_finish():
        while True:
            await asyncio.sleep(0)
            for f in futures:
                if f.done():
                    futures.remove(f)
                    try:
                        newf = next(coros)
                        futures.append(
                            asyncio.ensure_future(newf))
                    except StopIteration as e:
                        pass
                    return f.result()
    while len(futures) > 0:
        yield first_to_finish()

async def get_info_byid(i, url, session):
    async with session.get(url,timeout=20) as resp:
        print(url)
        with suppress(asyncio.TimeoutError):
            r = await resp.text()
            name = etree.HTML(r).xpath('//h2[starts-with(text(),"Customer Name")]/text()')
            data['name'].append(name)
            dataframe = pd.DataFrame(data)
            dataframe.to_csv(filename, index=False, sep='|')

limit = 1000
async def print_when_done(tasks):
    for res in limited_as_completed(tasks, limit):
        await res

url = "http://xxx.{}.html"
loop = asyncio.get_event_loop()

async def main():
    connector = TCPConnector(limit=10)
    async with ClientSession(connector=connector,headers=headers,raise_for_status=False) as session:
        coros = (get_info_byid(i, url.format(i), session) for i in range(N,N+1000000))
        await print_when_done(coros)

loop.run_until_complete(main())
loop.close()
print("took", time.time() - start, "seconds.")

The error log is:

Traceback (most recent call last):
  File "C:\Users\xxx.py", line 111, in <module>
    loop.run_until_complete(main())
  File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 573, in run_until_complete
    return future.result()
  File "C:\Users\xxx.py", line 109, in main
    await print_when_done(coros)
  File "C:\Users\xxx.py", line 98, in print_when_done
    await res
  File "C:\Users\xxx.py", line 60, in first_to_finish
    return f.result()
  File "C:\Users\xxx.py", line 65, in get_info_byid
    async with session.get(url,timeout=20) as resp:
  File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\site-packages\aiohttp\client.py", line 855, in __aenter__
    self._resp = await self._coro
  File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\site-packages\aiohttp\client.py", line 391, in _request
    await resp.start(conn)
  File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\site-packages\aiohttp\client_reqrep.py", line 770, in start
    self._continue = None
  File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\site-packages\aiohttp\helpers.py", line 673, in __exit__
    raise asyncio.TimeoutError from None
concurrent.futures._base.TimeoutError

I have tried 1) add expect asyncio.TimeoutError: pass. Not working

async def get_info_byid(i, url, session):
    async with session.get(url,timeout=20) as resp:
        print(url)
        try:
            r = await resp.text()
            name = etree.HTML(r).xpath('//h2[starts-with(text(),"Customer Name")]/text()')
            data['name'].append(name)
            dataframe = pd.DataFrame(data)
            dataframe.to_csv(filename, index=False, sep='|')
        except asyncio.TimeoutError:
            pass

2) suppress(asyncio.TimeoutError)as shown above. Not working

I just learned aiohttp yesterday, so maybe there is other things wrong in my code that causes timeout error only after a few minutes' running? Thank you very much if anyone knows how to deal with it!

like image 250
SiriusBlack Avatar asked Oct 29 '18 16:10

SiriusBlack


2 Answers

what @Yurii Kramarenko has done will raise Unclosed client session excecption for sure, since the session has never be properly closed. What I recommend is sth like this:

import asyncio
import aiohttp

async def main(urls):
    async with aiohttp.ClientSession(timeout=self.timeout) as session:
        tasks=[self.do_something(session,url) for url in urls]
        await asyncio.gather(*tasks)
like image 67
jbxiaoyu Avatar answered Oct 12 '22 11:10

jbxiaoyu


Simple example (not very good, but works fine):

import asyncio
from aiohttp.client import ClientSession


class Wrapper:

    def __init__(self, session):
        self._session = session

    async def get(self, url):
        try:
            async with self._session.get(url, timeout=20) as resp:
                return await resp.text()
        except Exception as e:
            print(e)


loop = asyncio.get_event_loop()
wrapper = Wrapper(ClientSession())

responses = loop.run_until_complete(
    asyncio.gather(
        wrapper.get('http://google.com'),
        wrapper.get('http://google.com'),
        wrapper.get('http://google.com'),
        wrapper.get('http://google.com'),
        wrapper.get('http://google.com')
    )
)

print(responses)
like image 41
Yurii Kramarenko Avatar answered Oct 12 '22 13:10

Yurii Kramarenko