I have a function that must never be called with the same value simultaneously from two threads. To enforce this, I have a defaultdict
that spawns new threading.Lock
s for a given key. Thus, my code looks similar to this:
from collections import defaultdict
import threading
lock_dict = defaultdict(threading.Lock)
def f(x):
with lock_dict[x]:
print "Locked for value x"
The problem is that I cannot figure out how to safely delete the lock from the defaultdict once its no longer needed. Without doing this, my program has a memory leak that becomes noticeable when f
is called with many different values of x.
I cannot simply del lock_dict[x]
at the end of f, because in the scenario that another thread is waiting for the lock, then the second thread will lock a lock that's no longer associated with lock_dict[x], and thus two threads could end up simultaneously calling f
with the same value of x.
I'd use a different approach:
fcond = threading.Condition()
fargs = set()
def f(x):
with fcond:
while x in fargs:
fcond.wait()
fargs.add(x) # this thread has exclusive rights to use `x`
# do useful stuff with x
# any other thread trying to call f(x) will
# block in the .wait above()
with fcond:
fargs.remove(x) # we're done with x
fcond.notify_all() # let blocked threads (if any) proceed
Conditions have a learning curve, but once it's climbed they make it much easier to write correct thread-safe, race-free code.
@JimMischel asked in a comment whether the orignal's use of defaultdict
was subject to races. Good question!
The answer is - alas - "you'll have to stare at your specific Python's implementation".
Assuming the CPython implementation: if any of the code invoked by defaultdict
to supply a default invokes Python code, or C code that releases the GIL (global interpreter lock), then 2 (or more) threads could "simultaneously" invoke withlock_dict[x]
with the same x
not already in the dict, and:
x
isn't in the dict, gets a lock, then loses its timeslice (before setting x
in the dict).x
isn't in the dict, and also gets a lock.f(x)
.Staring at the source for 3.4.0a4+ (the current development head), defaultdict
and threading.Lock
are both implemented by C code that doesn't release the GIL. I don't recall whether earlier versions did or didn't, at various times, implement all or parts of defaultdict
or threading.Lock
in Python.
My suggested alternative code is full of stuff implemented in Python (all threading.Condition
methods), but is race-free by design - even if you're using an old version of Python with sets also implemented in Python (the set is only accessed under the protection of the condition variable's lock).
Without conditions, this seems to be much harder. In the original approach, I believe you need to keep a count of threads wanting to use x
, and you need a lock to protect those counts and to protect the dictionary. The best code I've come up with for that is so long-winded that it seems sanest to put it in a context manager. To use, create an argument locker per function that needs it:
farglocker = ArgLocker() # for function `f()`
and then the body of f()
can be coded simply:
def f(x):
with farglocker(x):
# only one thread at a time can run with argument `x`
Of course the condition approach could also be wrapped in a context manager. Here's the code:
import threading
class ArgLocker:
def __init__(self):
self.xs = dict() # maps x to (lock, count) pair
self.lock = threading.Lock()
def __call__(self, x):
return AllMine(self.xs, self.lock, x)
class AllMine:
def __init__(self, xs, lock, x):
self.xs = xs
self.lock = lock
self.x = x
def __enter__(self):
x = self.x
with self.lock:
xlock = self.xs.get(x)
if xlock is None:
xlock = threading.Lock()
xlock.acquire()
count = 0
else:
xlock, count = xlock
self.xs[x] = xlock, count + 1
if count: # x was already known - wait for it
xlock.acquire()
assert xlock.locked
def __exit__(self, *args):
x = self.x
with self.lock:
xlock, count = self.xs[x]
assert xlock.locked
assert count > 0
count -= 1
if count:
self.xs[x] = xlock, count
else:
del self.xs[x]
xlock.release()
So which way is better? Using conditions ;-) That way is "almost obviously correct", but the lock-per-argument (LPA) approach is a bit of a head-scratcher. The LPA approach does have the advantage that when a thread is done with x
, the only threads allowed to proceed are those wanting to use the same x
; using conditions, the .notify_all()
wakes all threads blocked waiting on any argument. But unless there's very heavy contention among threads trying to use the same arguments, this isn't going to matter much: using conditions, the threads woken up that aren't waiting on x
stay awake only long enough to see that x in fargs
is true, and then immediately block (.wait()
) again.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With