I'm trying to submit further tasks from result of a task that was done:
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(my_task)
def callback(future):
for another_task in future.result():
future = executor.submit(another_task)
future.add_done_callback(callback)
future.add_done_callback(callback)
but I'm getting:
RuntimeError: cannot schedule new futures after shutdown
What's the best way to make the executor hold for the callback? A semaphore?
Ideally the solution should transfer well if ThreadPoolExecutor is replaced with ProcessPoolExecutor.
The callback is executed in a separate thread. Therefore, when your callback logic starts executing, there's a high chance the main loop will leave the context manager shutting down the Executor. That's why you observe the RuntimeError.
The easiest fix is running your logic sequentially.
futures = []
with concurrent.futures.ThreadPoolExecutor() as pool:
futures.append(pool.submit(task))
while futures:
for future in concurrent.futures.as_completed(futures):
futures.remove(future)
for new_task in future.result():
futures.append(pool.submit(new_task))
Note that this code might cause an exponential submission of tasks to the Executor.
This solution is guaranteed to max out processing at any given point. The use of sleep is not that elegant but this is the best I have so far.
with concurrent.futures.ThreadPoolExecutor() as executor:
pending_tasks = 1
future = executor.submit(my_task)
def callback(future):
nonlocal pending_tasks
for another_task in future.result():
pending_tasks += 1
future = executor.submit(another_task)
future.add_done_callback(callback)
pending_tasks -= 1
future.add_done_callback(callback)
while pending_tasks:
time.sleep(10)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With