Given a function f
, what's the best way to code a progress bar for f
? I.e., have a real-time progress bar that updates during the execution of f
. Note that I can't change f
(it's a function from another library), so there's no way to insert a pbar.update
call in f
(hence this is a post regarding progress bars for non-loop functions). Other SO posts have addressed this problem under the condition that you can change the code in f
, but I can't find/think of a solution when I don't have access to the contents of f
.
Would I have to use threading or multiprocessing to achieve something like this?
Something like:
@progress_bar
def func_wrapper(*args, **kwargs):
return f(*args, **kwargs)
or:
start_progress_bar()
f()
Any help is appreciated!
UPDATE: I've taken the code provided in @Acorn's answer and rewritten it in decorator form.
import concurrent.futures
import functools
import time
from tqdm import tqdm
def progress_bar(expected_time, increments=10):
def _progress_bar(func):
def timed_progress_bar(future, expected_time, increments=10):
"""
Display progress bar for expected_time seconds.
Complete early if future completes.
Wait for future if it doesn't complete in expected_time.
"""
interval = expected_time / increments
with tqdm(total=increments) as pbar:
for i in range(increments - 1):
if future.done():
# finish the progress bar
# not sure if there's a cleaner way to do this?
pbar.update(increments - i)
return
else:
time.sleep(interval)
pbar.update()
# if the future still hasn't completed, wait for it.
future.result()
pbar.update()
@functools.wraps(func)
def _func(*args, **kwargs):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(func, *args, **kwargs)
timed_progress_bar(future, expected_time, increments)
return future.result()
return _func
return _progress_bar
if __name__ == "__main__":
@progress_bar(expected_time=11)
def test_func():
time.sleep(10)
return "result"
print(test_func()) # prints "result"
If the function doesn't allow you to take action after units of work, i.e. by exposing a generator interface or callback of some sort, then the only solution would be to use a modified version of the function, or do some kind of monkeypatching.
The solution would be specific to the code in question.
So if you don't mind the progress bar not accurately reflecting the progress, and just using a time estimate you could do something like this.
import concurrent.futures
import time
from tqdm import tqdm
def timed_future_progress_bar(future, expected_time, increments=10):
"""
Display progress bar for expected_time seconds.
Complete early if future completes.
Wait for future if it doesn't complete in expected_time.
"""
interval = expected_time / increments
with tqdm(total=increments) as pbar:
for i in range(increments - 1):
if future.done():
# finish the progress bar
# not sure if there's a cleaner way to do this?
pbar.update(increments - i)
return
else:
time.sleep(interval)
pbar.update()
# if the future still hasn't completed, wait for it.
future.result()
pbar.update()
def blocking_job():
time.sleep(2)
return 'result'
def main():
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(blocking_job)
timed_future_progress_bar(future, 5)
print(f'Work done: {future.result()}')
main()
This should behave sensibly whether the job takes more or less time than expected. If the job runs longer than expected then the progress will wait at 90% until it completes.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With