I want something similar to executor.map
, except when I iterate over the results, I want to iterate over them according to the order of completion, e.g. the work item that was completed first should appear first in the iteration, etc. This is so the iteration will block iff every single work item in the sequence is not finished yet.
I know how to implement this myself using queues, but I'm wondering whether it's possible using the futures
framework.
(I mostly used thread-based executors, so I'd like an answer that applies to these, but a general answer would be welcome as well.)
UPDATE: Thanks for the answers! Can you please explain how I can use as_completed
with executor.map
? executor.map
is the most useful and succinct tool for me when using futures, and I'd be reluctant to start using Future
objects manually.
ThreadPoolExecutor. ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.
ThreadPoolExecutor Methods : submit(fn, *args, **kwargs): It runs a callable or a method and returns a Future object representing the execution state of the method. map(fn, *iterables, timeout = None, chunksize = 1) : It maps the method and iterables together immediately and will raise an exception concurrent. futures.
executor.map()
, like the builtin map()
, only returns results in the order of the iterable, so unfortunately you can't use it to determine the order of completion. concurrent.futures.as_completed()
is what you're looking for - here's an example:
import time
import concurrent.futures
times = [3, 1, 2]
def sleeper(secs):
time.sleep(secs)
print('I slept for {} seconds'.format(secs))
return secs
# returns in the order given
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
print(list(executor.map(sleeper, times)))
# I slept for 1 seconds
# I slept for 2 seconds
# I slept for 3 seconds
# [3, 1, 2]
# returns in the order completed
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futs = [executor.submit(sleeper, secs) for secs in times]
print([fut.result() for fut in concurrent.futures.as_completed(futs)])
# I slept for 1 seconds
# I slept for 2 seconds
# I slept for 3 seconds
# [1, 2, 3]
Of course if you are required to use a map interface, you could create your own map_as_completed()
function which encapsulates the above (maybe add it to a subclassed Executor()
), but I think creating futures instances through executor.submit()
is a simpler/cleaner way to go (also allows you to provide no-args, kwargs).
concurrent futures returns an iterator based on time of completion -- this sounds like it's exactly what you were looking for.
http://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.as_completed
Please let me know if you have any confusion or difficulty wrt implementation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With