Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python executor spawn tasks from done callback (recursively submit tasks)

I'm trying to submit further tasks from result of a task that was done:

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(my_task)
    def callback(future):
        for another_task in future.result():
            future = executor.submit(another_task)
            future.add_done_callback(callback)
    future.add_done_callback(callback)

but I'm getting:

RuntimeError: cannot schedule new futures after shutdown

What's the best way to make the executor hold for the callback? A semaphore?

Ideally the solution should transfer well if ThreadPoolExecutor is replaced with ProcessPoolExecutor.

like image 660
Uri Avatar asked Aug 16 '18 14:08

Uri


2 Answers

The callback is executed in a separate thread. Therefore, when your callback logic starts executing, there's a high chance the main loop will leave the context manager shutting down the Executor. That's why you observe the RuntimeError.

The easiest fix is running your logic sequentially.

futures = []

with concurrent.futures.ThreadPoolExecutor() as pool:
    futures.append(pool.submit(task))

    while futures:
        for future in concurrent.futures.as_completed(futures):
            futures.remove(future)

            for new_task in future.result():
                futures.append(pool.submit(new_task))

Note that this code might cause an exponential submission of tasks to the Executor.

like image 58
noxdafox Avatar answered Oct 18 '22 20:10

noxdafox


This solution is guaranteed to max out processing at any given point. The use of sleep is not that elegant but this is the best I have so far.

with concurrent.futures.ThreadPoolExecutor() as executor:
    pending_tasks = 1
    future = executor.submit(my_task)
    def callback(future):
        nonlocal pending_tasks
        for another_task in future.result():
            pending_tasks += 1
            future = executor.submit(another_task)
            future.add_done_callback(callback)
        pending_tasks -= 1
    future.add_done_callback(callback)
    while pending_tasks:
        time.sleep(10)
like image 38
Uri Avatar answered Oct 18 '22 22:10

Uri