I have a threaded class whose loop needs to execute 4 times every second. I know that I can do something like
do_stuff()
time.sleep(0.25)
but the problem is that is doesn't account for the time it takes to do_stuff()
. Effectively this needs to be a real-time thread. Is there a way to accomplish this? Ideally the thread would still be put to sleep when not executing code.
import threading
def work ():
threading.Timer(0.25, work).start ()
print "stackoverflow"
work ()
The above will make sure that work
is run with an interval of four times per second, the theory behind this is that it will "queue" a call to itself that will be run 0.25 seconds into the future, without hanging around waiting for that to happen.
Because of this it can do it's work (almost) entirely uninterrupted, and we are extremely close to executing the function exactly 4 times per second.
More about threading.Timer
can be read by following the below link to the python documentation:
Even though the previous function works as expected you could create a helper function to aid in dealing with future timed events.
Something as the below will be sufficient for this example, hopefully the code will speak for itself - it is not as advanced as it might appear.
See this as an inspiration when you might implement your own wrapper to fit your exact needs.
import threading
def do_every (interval, worker_func, iterations = 0):
if iterations != 1:
threading.Timer (
interval,
do_every, [interval, worker_func, 0 if iterations == 0 else iterations-1]
).start ()
worker_func ()
def print_hw ():
print "hello world"
def print_so ():
print "stackoverflow"
# call print_so every second, 5 times total
do_every (1, print_so, 5)
# call print_hw two times per second, forever
do_every (0.5, print_hw)
I did a bit different approach with one Thread, looping in a while loop. For me the advantages are:
More control for the Interval, you are able to stop the IntervalTimer with .stop()
method
Code:
from threading import Thread, Event
# StoppableThread is from user Dolphin, from http://stackoverflow.com/questions/5849484/how-to-exit-a-multithreaded-program
class StoppableThread(Thread):
def __init__(self):
Thread.__init__(self)
self.stop_event = Event()
def stop(self):
if self.isAlive() == True:
# set event to signal thread to terminate
self.stop_event.set()
# block calling thread until thread really has terminated
self.join()
class IntervalTimer(StoppableThread):
def __init__(self, interval, worker_func):
super().__init__()
self._interval = interval
self._worker_func = worker_func
def run(self):
while not self.stop_event.is_set():
self._worker_func()
sleep(self._interval)
Usage:
def hw():
print("Hello World")
interval = IntervalTimer(1,hw)
interval.start()
sleep(10)
interval.stop()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With