I am looking for a solid implementation to allow me to progressively work through a list of items using Queue.
The idea is that I want to use a set number of workers that willgo through a list of 20+ database intensive tasks and return the result. I want Python to start with the five first items and as soon as it's done with one task starts on the next task in the queue.
This is how I am currently doing it without Threading.
for key, v in self.sources.iteritems():
# Do Stuff
I would like to have a similar approach, but possibly without having to split the list up into subgroups of five. So that it automatically will pick up the next item in the list. The goal is to make sure that if one database is slowing down the process, it will not have a negative impact on the whole application.
You can implement this yourself, but Python 3 already comes with an Executor
-based solution for thread management, which you can use in Python 2.x by installing the backported version.
Your code could then look like
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_key = {}
for key, value in sources.items():
future_to_idday[executor.submit(do_stuff, value)] = key
for future in concurrent.futures.as_completed(future_to_key):
key = future_to_key[future]
result = future.result()
# process result
If you are using python3, I recommend the concurrent futures module. If you are not using python3 and are not attached to threads (versus processes) then you might try multiprocessing.Pool (though it comes with some caveats and I have had trouble with pools not closing properly in my applications). If you must use threads, in python2, you might end up writing code yourself - spawn 5 threads running consumer functions and simply push the calls (function + args) into the queue iteratively for the consumers to find and process them.
You could do it using only stdlib:
#!/usr/bin/env python
from multiprocessing.dummy import Pool # use threads
def db_task(key_value):
try:
key, value = key_value
# compute result..
return result, None
except Exception as e:
return None, e
def main():
pool = Pool(5)
for result, error in pool.imap_unordered(db_task, sources.items()):
if error is None:
print(result)
if __name__=="__main__":
main()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With