Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python aiohttp/asyncio - how to process returned data

Im in the process of moving some synchronous code to asyncio using aiohttp. the synchronous code was taking 15 minutes to run, so I'm hoping to improves this.

I have some working code which gets data from some urls and returns the body of each. But this is just against 1 lab site, I have 70+ actual sites.

So if I got a loop to create a list of all the urls for all sites that would make 700 urls in a list to be processed. Now processing them I don't think is a problem?

But doing 'stuff' with the results, I'm not sure how to program? I have code already that will do 'stuff' to each of the results that are returned, but I'm not sure how to program against the right type of result.

When the code runs does it process all urls and depending on the time to run, return an unknown order?

Do I need a function that will process any type of result?

import asyncio, aiohttp, ssl
from bs4 import BeautifulSoup

def page_content(page):
    return BeautifulSoup(page, 'html.parser')


async def fetch(session, url):
    with aiohttp.Timeout(15, loop=session.loop):
        async with session.get(url) as response:
            return page_content(await response.text())

async def get_url_data(urls, username, password):
    tasks = []
    # Fetch all responses within one Client session,
    # keep connection alive for all requests.
    async with aiohttp.ClientSession(auth=aiohttp.BasicAuth(username, password)) as session:
        for i in urls:
            task = asyncio.ensure_future(fetch(session, i))
            tasks.append(task)

        responses = await asyncio.gather(*tasks)
        # you now have all response bodies in this variable
        for i in responses:
            print(i.title.text)
        return responses


def main():
    username = 'monitoring'
    password = '*********'
    ip = '10.10.10.2'
    urls = [
        'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.10.0.1'),
        'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.0.1'),
        'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'frontend.domain.com'),
        'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'planner.domain.com'),
        'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.10.1'),
        'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.11.11.1'),
        'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.12.12.60'),
        'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.12.12.60'),
        'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'lon-dc-01.domain.com'),
        'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'lon-dc-01.domain.com'),
        ]
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(get_url_data(urls,username,password))
    data = loop.run_until_complete(future)
    print(data)

if __name__ == "__main__":
    main()
like image 992
AlexW Avatar asked Aug 30 '17 10:08

AlexW


1 Answers

Here's an example with concurrent.futures.ProcessPoolExecutor. If it's created without specifying max_workers, the implementation will use os.cpu_count instead. Also note that asyncio.wrap_future is public but undocumented. Alternatively, there's AbstractEventLoop.run_in_executor.

import asyncio
from concurrent.futures import ProcessPoolExecutor

import aiohttp
import lxml.html


def process_page(html):
    '''Meant for CPU-bound workload'''
    tree = lxml.html.fromstring(html)
    return tree.find('.//title').text


async def fetch_page(url, session):
    '''Meant for IO-bound workload'''
    async with session.get(url, timeout = 15) as res:
      return await res.text()


async def process(url, session, pool):
    html = await fetch_page(url, session)
    return await asyncio.wrap_future(pool.submit(process_page, html))


async def dispatch(urls):
    pool = ProcessPoolExecutor()
    async with aiohttp.ClientSession() as session:
        coros = (process(url, session, pool) for url in urls)
        return await asyncio.gather(*coros)


def main():
    urls = [
      'https://stackoverflow.com/',
      'https://serverfault.com/',
      'https://askubuntu.com/',
      'https://unix.stackexchange.com/'
    ]
    result = asyncio.get_event_loop().run_until_complete(dispatch(urls))
    print(result)

if __name__ == '__main__':
    main()
like image 178
saaj Avatar answered Sep 29 '22 13:09

saaj