Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to run tasks concurrently in asyncio?

I'm trying to learn how to run tasks concurrently using Python's asyncio module. In the following code, I've got a mock "web crawler" for an example. Basically, I am trying to make it where there are a max of two active fetch() requests happening at any given time, and I want process() to be called during the sleep() period.

import asyncio

class Crawler():

    urlq = ['http://www.google.com', 'http://www.yahoo.com', 
            'http://www.cnn.com', 'http://www.gamespot.com', 
            'http://www.facebook.com', 'http://www.evergreen.edu']

    htmlq = []
    MAX_ACTIVE_FETCHES = 2
    active_fetches = 0

    def __init__(self):
        pass

    async def fetch(self, url):
        self.active_fetches += 1
        print("Fetching URL: " + url);
        await(asyncio.sleep(2))
        self.active_fetches -= 1
        self.htmlq.append(url)

    async def crawl(self):
        while self.active_fetches < self.MAX_ACTIVE_FETCHES:
            if self.urlq:
                url = self.urlq.pop()
                task = asyncio.create_task(self.fetch(url))
                await task
            else:
                print("URL queue empty")
                break;

    def process(self, page):
        print("processed page: " + page)

# main loop

c = Crawler()
while(c.urlq):
    asyncio.run(c.crawl())
    while c.htmlq:
        page = c.htmlq.pop()
        c.process(page)

However, the code above downloads the URLs one by one (not two at a time concurrently) and doesn't do any "processing" until after all URLs have been fetched. How can I make the fetch() tasks run concurrently, and make it so that process() is called in between during sleep()?

like image 724
J. Taylor Avatar asked Jan 12 '19 03:01

J. Taylor


2 Answers

Your crawl method is waiting after each individual task; you should change it to this:

async def crawl(self):
    tasks = []
    while self.active_fetches < self.MAX_ACTIVE_FETCHES:
        if self.urlq:
            url = self.urlq.pop()
            tasks.append(asyncio.create_task(self.fetch(url)))
    await asyncio.gather(*tasks)

EDIT: Here's a cleaner version with comments that fetches and processes all at the same time, while preserving the basic ability to put a cap on the maximum number of fetchers.

import asyncio

class Crawler:

    def __init__(self, urls, max_workers=2):
        self.urls = urls
        # create a queue that only allows a maximum of two items
        self.fetching = asyncio.Queue()
        self.max_workers = max_workers

    async def crawl(self):
        # DON'T await here; start consuming things out of the queue, and
        # meanwhile execution of this function continues. We'll start two
        # coroutines for fetching and two coroutines for processing.
        all_the_coros = asyncio.gather(
            *[self._worker(i) for i in range(self.max_workers)])

        # place all URLs on the queue
        for url in self.urls:
            await self.fetching.put(url)

        # now put a bunch of `None`'s in the queue as signals to the workers
        # that there are no more items in the queue.
        for _ in range(self.max_workers):
            await self.fetching.put(None)

        # now make sure everything is done
        await all_the_coros

    async def _worker(self, i):
        while True:
            url = await self.fetching.get()
            if url is None:
                # this coroutine is done; simply return to exit
                return

            print(f'Fetch worker {i} is fetching a URL: {url}')
            page = await self.fetch(url)
            self.process(page)

    async def fetch(self, url):
        print("Fetching URL: " + url);
        await asyncio.sleep(2)
        return f"the contents of {url}"

    def process(self, page):
        print("processed page: " + page)


# main loop
c = Crawler(['http://www.google.com', 'http://www.yahoo.com', 
             'http://www.cnn.com', 'http://www.gamespot.com', 
             'http://www.facebook.com', 'http://www.evergreen.edu'])
asyncio.run(c.crawl())
like image 151
dtanabe Avatar answered Oct 15 '22 02:10

dtanabe


You can make htmlq an asyncio.Queue(), and change htmlq.append to htmlq.push. Then your main can be async, like this:

async def main():
    c = Crawler()
    asyncio.create_task(c.crawl())
    while True:
        page = await c.htmlq.get()
        if page is None:
            break
        c.process(page)

Your top-level code boils down to a call to asyncio.run(main()).

Once you are done with crawling, crawl() can enqueue None to notify the main coroutine that the work is done.

like image 40
user4815162342 Avatar answered Oct 15 '22 03:10

user4815162342