I'm trying to submit further tasks from result of a task that was done:
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(my_task)
def callback(future):
for another_task in future.result():
future = executor.submit(another_task)
future.add_done_callback(callback)
future.add_done_callback(callback)
but I'm getting:
RuntimeError: cannot schedule new futures after shutdown
What's the best way to make the executor hold for the callback? A semaphore?
Ideally the solution should transfer well if ThreadPoolExecutor
is replaced with ProcessPoolExecutor
.
The callback
is executed in a separate thread. Therefore, when your callback logic starts executing, there's a high chance the main loop will leave the context manager shutting down the Executor
. That's why you observe the RuntimeError
.
The easiest fix is running your logic sequentially.
futures = []
with concurrent.futures.ThreadPoolExecutor() as pool:
futures.append(pool.submit(task))
while futures:
for future in concurrent.futures.as_completed(futures):
futures.remove(future)
for new_task in future.result():
futures.append(pool.submit(new_task))
Note that this code might cause an exponential submission of tasks to the Executor
.
This solution is guaranteed to max out processing at any given point. The use of sleep
is not that elegant but this is the best I have so far.
with concurrent.futures.ThreadPoolExecutor() as executor:
pending_tasks = 1
future = executor.submit(my_task)
def callback(future):
nonlocal pending_tasks
for another_task in future.result():
pending_tasks += 1
future = executor.submit(another_task)
future.add_done_callback(callback)
pending_tasks -= 1
future.add_done_callback(callback)
while pending_tasks:
time.sleep(10)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With