Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I wait for ThreadPoolExecutor.map to finish

I have the following code, which has been simplified:

import concurrent.futures

pool = concurrent.futures.ThreadPoolExecutor(8)

def _exec(x):
    return x + x

myfuturelist = pool.map(_exec,[x for x in range(5)])

# How do I wait for my futures to finish?

for result in myfuturelist:
    # Is this how it's done?
    print(result)

#... stuff that should happen only after myfuturelist is
#completely resolved.
# Documentation says pool.map is asynchronous

The documentation is weak regarding ThreadPoolExecutor.map. Help would be great.

Thanks!

like image 596
Dr.Knowitall Avatar asked Dec 13 '25 16:12

Dr.Knowitall


2 Answers

The call to ThreadPoolExecutor.map does not block until all of its tasks are complete. Use wait to do this.

from concurrent.futures import wait, ALL_COMPLETED
...

futures = [pool.submit(fn, args) for args in arg_list]
wait(futures, timeout=whatever, return_when=ALL_COMPLETED)  # ALL_COMPLETED is actually the default
do_other_stuff()

You could also call list(results) on the generator returned by pool.map to force the evaluation (which is what you're doing in your original example). If you're not actually using the values returned from the tasks, though, wait is the way to go.

like image 82
mway Avatar answered Dec 15 '25 09:12

mway


It's true that Executor.map() will not wait for all futures to finish. Because it returns a lazy iterator like @MisterMiyagi said.

But we can accomplish this by using with:

import time

from concurrent.futures import ThreadPoolExecutor

def hello(i):
    time.sleep(i)
    print(i)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.map(hello, [1, 2, 3])
print("finish")

# output
# 1
# 2
# 3
# finish

As you can see, finish is printed after 1,2,3. It works because Executor has a __exit__() method, code is

def __exit__(self, exc_type, exc_val, exc_tb):
    self.shutdown(wait=True)
    return False

the shutdown method of ThreadPoolExecutor is

def shutdown(self, wait=True, *, cancel_futures=False):
    with self._shutdown_lock:
        self._shutdown = True
        if cancel_futures:
            # Drain all work items from the queue, and then cancel their
            # associated futures.
            while True:
                try:
                    work_item = self._work_queue.get_nowait()
                except queue.Empty:
                    break
                if work_item is not None:
                    work_item.future.cancel()

        # Send a wake-up to prevent threads calling
        # _work_queue.get(block=True) from permanently blocking.
        self._work_queue.put(None)
    if wait:
        for t in self._threads:
            t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__

So by using with, we can get the ability to wait until all futures finish.

like image 27
milkice Avatar answered Dec 15 '25 08:12

milkice