Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Garbage-collect a lock once no threads are asking for it

I have a function that must never be called with the same value simultaneously from two threads. To enforce this, I have a defaultdict that spawns new threading.Locks for a given key. Thus, my code looks similar to this:

from collections import defaultdict
import threading

lock_dict = defaultdict(threading.Lock)
def f(x):
    with lock_dict[x]:
        print "Locked for value x"

The problem is that I cannot figure out how to safely delete the lock from the defaultdict once its no longer needed. Without doing this, my program has a memory leak that becomes noticeable when f is called with many different values of x.

I cannot simply del lock_dict[x] at the end of f, because in the scenario that another thread is waiting for the lock, then the second thread will lock a lock that's no longer associated with lock_dict[x], and thus two threads could end up simultaneously calling f with the same value of x.

like image 364
Ponkadoodle Avatar asked Nov 06 '13 05:11

Ponkadoodle


1 Answers

I'd use a different approach:

fcond = threading.Condition()
fargs = set()

def f(x):
    with fcond:
        while x in fargs:
            fcond.wait()
        fargs.add(x)  # this thread has exclusive rights to use `x`

    # do useful stuff with x
    # any other thread trying to call f(x) will
    # block in the .wait above()

    with fcond:
        fargs.remove(x)      # we're done with x
        fcond.notify_all()   # let blocked threads (if any) proceed

Conditions have a learning curve, but once it's climbed they make it much easier to write correct thread-safe, race-free code.

Thread safety of the original code

@JimMischel asked in a comment whether the orignal's use of defaultdict was subject to races. Good question!

The answer is - alas - "you'll have to stare at your specific Python's implementation".

Assuming the CPython implementation: if any of the code invoked by defaultdict to supply a default invokes Python code, or C code that releases the GIL (global interpreter lock), then 2 (or more) threads could "simultaneously" invoke withlock_dict[x] with the same x not already in the dict, and:

  1. Thread 1 sees that x isn't in the dict, gets a lock, then loses its timeslice (before setting x in the dict).
  2. Thread 2 sees that x isn't in the dict, and also gets a lock.
  3. One of those thread's locks ends up in the dict, but both threads execute f(x).

Staring at the source for 3.4.0a4+ (the current development head), defaultdict and threading.Lock are both implemented by C code that doesn't release the GIL. I don't recall whether earlier versions did or didn't, at various times, implement all or parts of defaultdict or threading.Lock in Python.

My suggested alternative code is full of stuff implemented in Python (all threading.Condition methods), but is race-free by design - even if you're using an old version of Python with sets also implemented in Python (the set is only accessed under the protection of the condition variable's lock).

One lock per argument

Without conditions, this seems to be much harder. In the original approach, I believe you need to keep a count of threads wanting to use x, and you need a lock to protect those counts and to protect the dictionary. The best code I've come up with for that is so long-winded that it seems sanest to put it in a context manager. To use, create an argument locker per function that needs it:

farglocker = ArgLocker() # for function `f()`

and then the body of f() can be coded simply:

def f(x):
    with farglocker(x):
        # only one thread at a time can run with argument `x`

Of course the condition approach could also be wrapped in a context manager. Here's the code:

import threading

class ArgLocker:
    def __init__(self):
        self.xs = dict() # maps x to (lock, count) pair
        self.lock = threading.Lock()

    def __call__(self, x):
        return AllMine(self.xs, self.lock, x)

class AllMine:
    def __init__(self, xs, lock, x):
        self.xs = xs
        self.lock = lock
        self.x = x

    def __enter__(self):
        x = self.x
        with self.lock:
            xlock = self.xs.get(x)
            if xlock is None:
                xlock = threading.Lock()
                xlock.acquire()
                count = 0
            else:
                xlock, count = xlock
            self.xs[x] = xlock, count + 1

        if count: # x was already known - wait for it
            xlock.acquire()
        assert xlock.locked

    def __exit__(self, *args):
        x = self.x
        with self.lock:
            xlock, count = self.xs[x]
            assert xlock.locked
            assert count > 0
            count -= 1
            if count:
                self.xs[x] = xlock, count
            else:
                del self.xs[x]
            xlock.release()

So which way is better? Using conditions ;-) That way is "almost obviously correct", but the lock-per-argument (LPA) approach is a bit of a head-scratcher. The LPA approach does have the advantage that when a thread is done with x, the only threads allowed to proceed are those wanting to use the same x; using conditions, the .notify_all() wakes all threads blocked waiting on any argument. But unless there's very heavy contention among threads trying to use the same arguments, this isn't going to matter much: using conditions, the threads woken up that aren't waiting on x stay awake only long enough to see that x in fargs is true, and then immediately block (.wait()) again.

like image 197
Tim Peters Avatar answered Nov 02 '22 03:11

Tim Peters