Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to chain futures in a non-blocking manner? That is, how to use one future as an input in another future without blocking?

Using the below example, how can future2 use the result of future1 once future1 is complete (without blocking future3 from being submitted)?

from concurrent.futures import ProcessPoolExecutor
import time

def wait(seconds):
    time.sleep(seconds)
    return seconds

pool = ProcessPoolExecutor()

s = time.time()
future1 = pool.submit(wait, 5)
future2 = pool.submit(wait, future1.result())
future3 = pool.submit(wait, 10)

time_taken = time.time() - s
print(time_taken)
like image 724
Greg Avatar asked Aug 28 '17 15:08

Greg


1 Answers

This is achievable by carefully crafting a callback to submit the second operation after the first one has completed. Sadly, it is not possible to pass an arbitrary future to pool.submit so an extra step is required to bind the two futures together.

Here is a possible implementation:

import concurrent.futures

def copy_future_state(source, destination):
    if source.cancelled():
        destination.cancel()
    if not destination.set_running_or_notify_cancel():
        return
    exception = source.exception()
    if exception is not None:
        destination.set_exception(exception)
    else:
        result = source.result()
        destination.set_result(result)


def chain(pool, future, fn):
    result = concurrent.futures.Future()

    def callback(_):
        try:
            temp = pool.submit(fn, future.result())
            copy = lambda _: copy_future_state(temp, result)
            temp.add_done_callback(copy)
        except:
            result.cancel()
            raise

    future.add_done_callback(callback)
    return result

Note that copy_future_state is a slightly modified version of asyncio.futures._set_concurrent_future_state.

Usage:

from concurrent.futures import ProcessPoolExecutor

def wait(seconds):
    time.sleep(seconds)
    return seconds

pool = ProcessPoolExecutor()
future1 = pool.submit(wait, 5)
future2 = chain(pool, future1, wait)
future3 = pool.submit(wait, 10)
like image 157
Vincent Avatar answered Oct 07 '22 03:10

Vincent