Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fetching data with Python's asyncio in a sequential order

I have a Python 2.7 program which pulls data from websites and dumps the results to a database. It follows the consumer producer model and is written using the threading module.

Just for fun I would like to rewrite this program using the new asyncio module (from 3.4) but I cannot figure out how to do this properly.

The most crucial requirement is that the program must fetch data from the same website in a sequential order. For example for an url 'http://a-restaurant.com' it should first get 'http://a-restaurant.com/menu/0', then 'http://a-restaurant.com/menu/1', then 'http://a-restaurant.com/menu/2', ... If they are not fetched in order the website stops delivering pages altogether and you have to start from 0.

However another fetch for another website ('http://another-restaurant.com') can (and should) run at the same time (the other sites also have the sequantial restriction).

The threading module suits well for this as I can create separate threads for each website and in each thread it can wait until one page has finished loading before fetching another one.

Here's a grossly simplified code snippet from the threading version (Python 2.7):

class FetchThread(threading.Threading)
    def __init__(self, queue, url)
        self.queue = queue
        self.baseurl = url
    ...
    def run(self)
        # Get 10 menu pages in a sequantial order
        for food in range(10):
            url = self.baseurl + '/' + str(food)
            text = urllib2.urlopen(url).read()
            self.queue.put(text)
            ...
def main()
    queue = Queue.Queue()
    urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu')
    for url in urls:
        fetcher = FetchThread(queue, url)
        fetcher.start()
        ...

And here's how I tried to do it with asyncio (in 3.4.1):

@asyncio.coroutine
def fetch(url):
    response = yield from aiohttp.request('GET', url)
    response = yield from response.read_and_close()
    return response.decode('utf-8')

@asyncio.coroutine
def print_page(url):
    page = yield from fetch(url)
    print(page)


l = []
urls = ('http://a-restaurant.com/menu', 'http://another-restaurant.com/menu')
for url in urls:
    for food in range(10):
        menu_url = url + '/' + str(food)
        l.append(print_page(menu_url))

loop.run_until_complete(asyncio.wait(l))

And it fetches and prints everything in a non-sequential order. Well, I guess that's the whole idea of those coroutines. Should I not use aiohttp and just fetch with urllib? But do the fetches for the first restaurant then block the fetches for the other restaurants? Am I just thinking this completely wrong? (This is just a test to try fetch things in a sequential order. Haven't got to the queue part yet.)

like image 972
mat Avatar asked Mar 20 '23 07:03

mat


1 Answers

Your current code will work fine for the restaurant that doesn't care about sequential ordering of requests. All ten requests for the menu will run concurrently, and will print to stdout as soon as they're complete.

Obviously, this won't work for the restaurant that requires sequential requests. You need to refactor a bit for that to work:

@asyncio.coroutine
def fetch(url):
    response = yield from aiohttp.request('GET', url)
    response = yield from response.read_and_close()
    return response.decode('utf-8')

@asyncio.coroutine
def print_page(url):
    page = yield from fetch(url)
    print(page)

@syncio.coroutine
def print_pages_sequential(url, num_pages):
    for food in range(num_pages):
        menu_url = url + '/' + str(food)
        yield from print_page(menu_url)

l = [print_pages_sequential('http://a-restaurant.com/menu', 10)]

conc_url = 'http://another-restaurant.com/menu'
for food in range(10):
    menu_url = conc_url + '/' + str(food)
    l.append(print_page(menu_url))

loop.run_until_complete(asyncio.wait(l))

Instead of adding all ten requests for the sequential restaurant to the list, we add one coroutine to the list which will iterate over all ten pages sequentially. The way this works is that yield from print_page will stop the execution of print_pages_sequential until the print_page request is complete, but it will do so without blocking any other coroutines that are running concurrently (like all the print_page calls you append to l).

By doing it this way, all of your "another-restaurant" requests can run completely concurrently, just like you want, and your "a-restaurant" requests will run sequentially, but without blocking any of the "another-restaurant" requests.

Edit:

If all the sites have the same sequential fetching requirement, the logic can be simplified more:

l = []
urls = ["http://a-restaurant.com/menu", "http://another-restaurant.com/menu"]
for url in urls:
    menu_url = url + '/' + str(food)
    l.append(print_page_sequential(menu_url, 10))

loop.run_until_complete(asyncio.wait(l))
like image 169
dano Avatar answered Apr 02 '23 11:04

dano