Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dynamically add to list of what Python asyncio's event loop should execute

I've got a function download_all that iterates through a hardcoded list of pages to download them all in sequence. But if I'd like to dynamically add to the list based on the results of a page, how can I do it? For example download the first page, parse it, and based on the results add others to the event loop.

@asyncio.coroutine
def download_all():
    first_page = 1
    last_page = 100
    download_list = [download(page_number) for page_number in range(first_page, last_page)]
    gen = asyncio.wait(download_list)
    return gen

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    futures = loop.run_until_complete(download_all())
like image 255
user779159 Avatar asked Jan 23 '15 17:01

user779159


1 Answers

One way to accomplish this is by using a Queue.

#!/usr/bin/python3

import asyncio

try:  
    # python 3.4
    from asyncio import JoinableQueue as Queue
except:  
    # python 3.5
    from asyncio import Queue

@asyncio.coroutine
def do_work(task_name, work_queue):
    while not work_queue.empty():
        queue_item = work_queue.get_nowait()

        # simulate condition where task is added dynamically
        if queue_item % 2 != 0:
            work_queue.put_nowait(2)
            print('Added additional item to queue')

        print('{0} got item: {1}'.format(task_name, queue_item))
        yield from asyncio.sleep(queue_item)
        print('{0} finished processing item: {1}'.format(task_name, queue_item))

if __name__ == '__main__':

    queue = Queue()

    # Load initial jobs into queue
    [queue.put_nowait(x) for x in range(1, 6)] 

    # use 3 workers to consume tasks
    taskers = [ 
        do_work('task1', queue),
        do_work('task2', queue),
        do_work('task3', queue)
    ]   

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(taskers))
    loop.close()

Using a queue from asyncio you can ensure that the "units" of work are separate from the tasks/futures that are given to asyncio's event loop initially. Basically this allows for the addition of extra "units" of work given some condition.

Note that in the example above even numbered tasks are terminal so an additional task is not added if that is the case. This eventually results in the completion of all tasks, but in your case you could easily use another condition to determine whether another item is added to the queue or not.

Output:

Added additional item to queue
task2 got item: 1
task1 got item: 2
Added additional item to queue
task3 got item: 3
task2 finished processing item: 1
task2 got item: 4
task1 finished processing item: 2
Added additional item to queue
task1 got item: 5
task3 finished processing item: 3
task3 got item: 2
task3 finished processing item: 2
task3 got item: 2
task2 finished processing item: 4
task2 got item: 2
task1 finished processing item: 5
task3 finished processing item: 2
task2 finished processing item: 2
like image 189
trendsetter37 Avatar answered Oct 20 '22 11:10

trendsetter37