Context
I recently posted a timer class for review on Code Review. I'd had a gut feeling there were concurrency bugs as I'd once seen 1 unit test fail, but was unable to reproduce the failure. Hence my post to code review.
I got some great feedback highlighting various race conditions in the code. (I thought) I understood the problem and the solution, but before making any fixes, I wanted to expose the bugs with a unit test. When I tried, I realised it was difficult. Various stack exchange answers suggested I'd have to control the execution of threads to expose the bug(s) and any contrived timing would not necessarily be portable to a different machine. This seemed like a lot of accidental complexity beyond the problem I was trying to solve.
Instead I tried using the best static analysis (SA) tool for python, PyLint, to see if it'd pick out any of the bugs, but it couldn't. Why could a human find the bugs through code review (essentially SA), but a SA tool could not?
Afraid of trying to get Valgrind working with python (which sounded like yak-shaving), I decided to have a bash at fixing the bugs without reproducing them first. Now I'm in a pickle.
Here's the code now.
from threading import Timer, Lock from time import time class NotRunningError(Exception): pass class AlreadyRunningError(Exception): pass class KitchenTimer(object): ''' Loosely models a clockwork kitchen timer with the following differences: You can start the timer with arbitrary duration (e.g. 1.2 seconds). The timer calls back a given function when time's up. Querying the time remaining has 0.1 second accuracy. ''' PRECISION_NUM_DECIMAL_PLACES = 1 RUNNING = "RUNNING" STOPPED = "STOPPED" TIMEUP = "TIMEUP" def __init__(self): self._stateLock = Lock() with self._stateLock: self._state = self.STOPPED self._timeRemaining = 0 def start(self, duration=1, whenTimeup=None): ''' Starts the timer to count down from the given duration and call whenTimeup when time's up. ''' with self._stateLock: if self.isRunning(): raise AlreadyRunningError else: self._state = self.RUNNING self.duration = duration self._userWhenTimeup = whenTimeup self._startTime = time() self._timer = Timer(duration, self._whenTimeup) self._timer.start() def stop(self): ''' Stops the timer, preventing whenTimeup callback. ''' with self._stateLock: if self.isRunning(): self._timer.cancel() self._state = self.STOPPED self._timeRemaining = self.duration - self._elapsedTime() else: raise NotRunningError() def isRunning(self): return self._state == self.RUNNING def isStopped(self): return self._state == self.STOPPED def isTimeup(self): return self._state == self.TIMEUP @property def timeRemaining(self): if self.isRunning(): self._timeRemaining = self.duration - self._elapsedTime() return round(self._timeRemaining, self.PRECISION_NUM_DECIMAL_PLACES) def _whenTimeup(self): with self._stateLock: self._state = self.TIMEUP self._timeRemaining = 0 if callable(self._userWhenTimeup): self._userWhenTimeup() def _elapsedTime(self): return time() - self._startTime
Question
In the context of this code example, how can I expose the race conditions, fix them, and prove they're fixed?
Extra points
extra points for a testing framework suitable for other implementations and problems rather than specifically to this code.
Takeaway
My takeaway is that the technical solution to reproduce the identified race conditions is to control the synchronism of two threads to ensure they execute in the order that will expose a bug. The important point here is that they are already identified race conditions. The best way I've found to identify race conditions is to put your code up for code review and encourage more expert people analyse it.
There are many types of race conditions, although a common type of race condition is when two or more threads attempt to change the same data variable. NOTE: Race conditions are a real problem in Python when using threads, even in the presence of the global interpreter lock (GIL).
Race conditions can be avoided by proper thread synchronization in critical sections. Thread synchronization can be achieved using a synchronized block of Java code. Thread synchronization can also be achieved using other synchronization constructs like locks or atomic variables like java.
The most common solution to testing thread (un)safe code is to start a lot of threads and hope for the best. The problem I, and I can imagine others, have with this is that it relies on chance and it makes tests 'heavy'.
As I ran into this a while ago I wanted to go for precision instead of brute force. The result is a piece of test code to cause race-conditions by letting the threads race neck to neck.
spam = [] def set_spam(): spam[:] = foo() use(spam)
If set_spam
is called from several threads, a race condition exists between modification and use of spam
. Let's try to reproduce it consistently.
class TriggeredThread(threading.Thread): def __init__(self, sequence=None, *args, **kwargs): self.sequence = sequence self.lock = threading.Condition() self.event = threading.Event() threading.Thread.__init__(self, *args, **kwargs) def __enter__(self): self.lock.acquire() while not self.event.is_set(): self.lock.wait() self.event.clear() def __exit__(self, *args): self.lock.release() if self.sequence: next(self.sequence).trigger() def trigger(self): with self.lock: self.event.set() self.lock.notify()
Then to demonstrate the use of this thread:
spam = [] # Use a list to share values across threads. results = [] # Register the results. def set_spam(): thread = threading.current_thread() with thread: # Acquires the lock. # Set 'spam' to thread name spam[:] = [thread.name] # Thread 'releases' the lock upon exiting the context. # The next thread is triggered and this thread waits for a trigger. with thread: # Since each thread overwrites the content of the 'spam' # list, this should only result in True for the last thread. results.append(spam == [thread.name]) threads = [ TriggeredThread(name='a', target=set_spam), TriggeredThread(name='b', target=set_spam), TriggeredThread(name='c', target=set_spam)] # Create a shifted sequence of threads and share it among the threads. thread_sequence = itertools.cycle(threads[1:] + threads[:1]) for thread in threads: thread.sequence = thread_sequence # Start each thread [thread.start() for thread in threads] # Trigger first thread. # That thread will trigger the next thread, and so on. threads[0].trigger() # Wait for each thread to finish. [thread.join() for thread in threads] # The last thread 'has won the race' overwriting the value # for 'spam', thus [False, False, True]. # If set_spam were thread-safe, all results would be true. assert results == [False, False, True], "race condition triggered" assert results == [True, True, True], "code is thread-safe"
I think I explained enough about this construction so you can implement it for your own situation. I think this fits the 'extra points' section quite nicely:
extra points for a testing framework suitable for other implementations and problems rather than specifically to this code.
Each threading issue is solved in it's own specific way. In the example above I caused a race-condition by sharing a value across threads. Similar problems can occur when using global variables, such as a module attribute. The key to solving such issues may be to use a thread-local storage:
# The thread local storage is a global. # This may seem weird at first, but it isn't actually shared among threads. data = threading.local() data.spam = [] # This list only exists in this thread. results = [] # Results *are* shared though. def set_spam(): thread = threading.current_thread() # 'get' or set the 'spam' list. This actually creates a new list. # If the list was shared among threads this would cause a race-condition. data.spam = getattr(data, 'spam', []) with thread: data.spam[:] = [thread.name] with thread: results.append(data.spam == [thread.name]) # Start the threads as in the example above. assert all(results) # All results should be True.
A common threading issue is the problem of multiple threads reading and/or writing to a data holder concurrently. This problem is solved by implementing a read-write lock. The actual implementation of a read-write lock may differ. You may choose a read-first lock, a write-first lock or just at random.
I'm sure there are examples out there describing such locking techniques. I may write an example later as this is quite a long answer already. ;-)
Have a look at the threading module documentation and experiment with it a bit. As each threading issue is different, different solutions apply.
While on the subject of threading, have a look at the Python GIL (Global Interpreter Lock). It is important to note that threading may not actually be the best approach in optimizing performance (but this is not your goal). I found this presentation pretty good: https://www.youtube.com/watch?v=zEaosS1U5qY
Traditionally, forcing race conditions in multithreaded code is done with semaphores, so you can force a thread to wait until another thread has achieved some edge condition before continuing.
For example, your object has some code to check that start
is not called if the object is already running. You could force this condition to make sure it behaves as expected by doing something like this:
KitchenTimer
AlreadyRunningError
To do some of this you may need to extend the KitchenTimer class. Formal unit tests will often use mock objects which are defined to block at critical times. Mock objects are a bigger topic than I can address here, but googling "python mock object" will turn up a lot of documentation and many implementations to choose from.
Here's a way that you could force your code to throw AlreadyRunningError
:
import threading class TestKitchenTimer(KitchenTimer): _runningLock = threading.Condition() def start(self, duration=1, whenTimeUp=None): KitchenTimer.start(self, duration, whenTimeUp) with self._runningLock: print "waiting on _runningLock" self._runningLock.wait() def resume(self): with self._runningLock: self._runningLock.notify() timer = TestKitchenTimer() # Start the timer in a subthread. This thread will block as soon as # it is started. thread_1 = threading.Thread(target = timer.start, args = (10, None)) thread_1.start() # Attempt to start the timer in a second thread, causing it to throw # an AlreadyRunningError. try: thread_2 = threading.Thread(target = timer.start, args = (10, None)) thread_2.start() except AlreadyRunningError: print "AlreadyRunningError" timer.resume() timer.stop()
Reading through the code, identify some of the boundary conditions you want to test, then think about where you would need to pause the timer to force that condition to arise, and add Conditions, Semaphores, Events, etc. to make it happen. e.g. what happens if, just as the timer runs the whenTimeUp callback, another thread tries to stop it? You can force that condition by making the timer wait as soon as it's entered _whenTimeUp:
import threading class TestKitchenTimer(KitchenTimer): _runningLock = threading.Condition() def _whenTimeup(self): with self._runningLock: self._runningLock.wait() KitchenTimer._whenTimeup(self) def resume(self): with self._runningLock: self._runningLock.notify() def TimeupCallback(): print "TimeupCallback was called" timer = TestKitchenTimer() # The timer thread will block when the timer expires, but before the callback # is invoked. thread_1 = threading.Thread(target = timer.start, args = (1, TimeupCallback)) thread_1.start() sleep(2) # The timer is now blocked. In the parent thread, we stop it. timer.stop() print "timer is stopped: %r" % timer.isStopped() # Now allow the countdown thread to resume. timer.resume()
Subclassing the class you want to test isn't an awesome way to instrument it for testing: you'll have to override basically all of the methods in order to test race conditions in each one, and at that point there's a good argument to be made that you're not really testing the original code. Instead, you may find it cleaner to put the semaphores right in the KitchenTimer object but initialized to None by default, and have your methods check if testRunningLock is not None:
before acquiring or waiting on the lock. Then you can force races on the actual code that you're submitting.
Some reading on Python mock frameworks that may be helpful. In fact, I'm not sure that mocks would be helpful in testing this code: it's almost entirely self-contained and doesn't rely on many external objects. But mock tutorials sometimes touch on issues like these. I haven't used any of these, but the documentation on these like a good place to get started:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With