Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Picking up items progressivly as soon as a queue is available

I am looking for a solid implementation to allow me to progressively work through a list of items using Queue.

The idea is that I want to use a set number of workers that willgo through a list of 20+ database intensive tasks and return the result. I want Python to start with the five first items and as soon as it's done with one task starts on the next task in the queue.

This is how I am currently doing it without Threading.

for key, v in self.sources.iteritems():
    # Do Stuff

I would like to have a similar approach, but possibly without having to split the list up into subgroups of five. So that it automatically will pick up the next item in the list. The goal is to make sure that if one database is slowing down the process, it will not have a negative impact on the whole application.

like image 332
eandersson Avatar asked Jan 22 '13 13:01

eandersson


3 Answers

You can implement this yourself, but Python 3 already comes with an Executor-based solution for thread management, which you can use in Python 2.x by installing the backported version.

Your code could then look like

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_key = {}
    for key, value in sources.items():
        future_to_idday[executor.submit(do_stuff, value)] = key
    for future in concurrent.futures.as_completed(future_to_key):
        key = future_to_key[future]
        result = future.result()
        # process result
like image 74
Katriel Avatar answered Nov 02 '22 12:11

Katriel


If you are using python3, I recommend the concurrent futures module. If you are not using python3 and are not attached to threads (versus processes) then you might try multiprocessing.Pool (though it comes with some caveats and I have had trouble with pools not closing properly in my applications). If you must use threads, in python2, you might end up writing code yourself - spawn 5 threads running consumer functions and simply push the calls (function + args) into the queue iteratively for the consumers to find and process them.

like image 28
Mayur Patel Avatar answered Nov 02 '22 12:11

Mayur Patel


You could do it using only stdlib:

#!/usr/bin/env python
from multiprocessing.dummy import Pool # use threads

def db_task(key_value):
    try:
        key, value = key_value
        # compute result..
        return result, None
    except Exception as e:
        return None, e

def main():
    pool = Pool(5)
    for result, error in pool.imap_unordered(db_task, sources.items()):
        if error is None:
            print(result)

if __name__=="__main__":
    main()
like image 28
jfs Avatar answered Nov 02 '22 12:11

jfs