I have a section of code that I need to get the output from:
gps = get_gps_data()
Though if it takes too long to get the output from get_gps_data()
, I would like to cancel the process and set gps
to None
. Modifying the function is impossible, so is there a way to specify the max amount of time to wait for some code to run and abort if it reaches that time, say 5 seconds?
You can do some thing like below. This will work for any logic or function that you want to check and stop after some time. The function that you want to check and cancel is executed in a separate thread and monitored. After waiting for 3 seconds, it is cancelled.
Just put your code inside the test
function (you can rename the functions). test
function tries to sleep for 100 seconds (could be any logic). But the main call future.result(3)
, waits only for 3 seconds else TimeoutError is thrown and some other logic follows.
import time
import concurrent.futures as futures
def test():
print('Sleeping for 100 seconds')
time.sleep(100)
# Add your code here
# gps = get_gps_data()
return 'Done'
# Main
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test)
try:
resp = future.result(3)
except futures.TimeoutError:
print('Some other resp')
else:
print(resp)
executor._threads.clear()
futures.thread._threads_queues.clear()
You can try to minimise the wait time inside the test
function to some value less than 3 seconds. In this case, the original result returned from test
function will be used.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With