Using the below example, how can future2
use the result of future1
once future1
is complete (without blocking future3
from being submitted)?
from concurrent.futures import ProcessPoolExecutor
import time
def wait(seconds):
time.sleep(seconds)
return seconds
pool = ProcessPoolExecutor()
s = time.time()
future1 = pool.submit(wait, 5)
future2 = pool.submit(wait, future1.result())
future3 = pool.submit(wait, 10)
time_taken = time.time() - s
print(time_taken)
This is achievable by carefully crafting a callback to submit the second operation after the first one has completed. Sadly, it is not possible to pass an arbitrary future to pool.submit
so an extra step is required to bind the two futures together.
Here is a possible implementation:
import concurrent.futures
def copy_future_state(source, destination):
if source.cancelled():
destination.cancel()
if not destination.set_running_or_notify_cancel():
return
exception = source.exception()
if exception is not None:
destination.set_exception(exception)
else:
result = source.result()
destination.set_result(result)
def chain(pool, future, fn):
result = concurrent.futures.Future()
def callback(_):
try:
temp = pool.submit(fn, future.result())
copy = lambda _: copy_future_state(temp, result)
temp.add_done_callback(copy)
except:
result.cancel()
raise
future.add_done_callback(callback)
return result
Note that copy_future_state
is a slightly modified version of asyncio.futures._set_concurrent_future_state.
Usage:
from concurrent.futures import ProcessPoolExecutor
def wait(seconds):
time.sleep(seconds)
return seconds
pool = ProcessPoolExecutor()
future1 = pool.submit(wait, 5)
future2 = chain(pool, future1, wait)
future3 = pool.submit(wait, 10)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With