Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Removing a semaphore when it has no waits

Here's a concurrency problem:

A string value is used to represent an abstract resource, and only one thread is allowed to work for a given string value; but multiple threads can run concurrently if their string values are different. So far, so simple:

private static readonly Dictionary<String,Object> _locks = new Dictionary<String,Object>();

public static void DoSomethingMutuallyExclusiveByName(String resourceName) {

    Object resourceLock;
    lock( _locks ) {

        if( !_locks.TryGetValue( resourceName, out resourceLock ) ) {
            _locks.Add( resourceName, resourceLock = new Object() );
        }
    }

    lock( resourceLock ) {

        EnterCriticalSection( resourceName );
    }
}

But this is suboptimal: the domain of resourceName is unbounded and _locks could end up containing thousands or more strings. So after no more threads are using a specific resourceName value then its locking-object should be removed from the dictionary.

It would be a bug to simply remove a lock-object after it has been used, because of this scenario below (with relevant code located further below). Note that all three threads have resourceName = "foo".

  • Thread 1 is at labelC and has just removed its resourceName = "foo" from the _locks dictionary thus removing resourceLock (#1).
  • Thread 2 is at labelB, because no other thread is locked on resourceLock (#1) it does not wait at lock( resourceLock but instead continues into EnterCriticalSection.
  • Thread 3 is at labelA and because resourceName = "foo" is not in _locks (because Thread 1 removed it) it adds a new instance to _locks, and because this resourceLock (#2) is a new instance the lock( resourceLock ) in thread 3 will not wait for thread 2, consequently both threads 2 and 3 can be inside EnterCriticalSection with the same resourceName value.

Code:

public static void DoSomethingMutuallyExclusiveByName(String resourceName) {

labelA:        
    Object resourceLock;
    lock( _locks ) {

        if( !_locks.TryGetValue( resourceName, out resourceLock ) ) {
            _locks.Add( resourceName, resourceLock = new Object() );
        }
    }

    try {

labelB:
        lock( resourceLock ) {

            EnterCriticalSection( resourceName );
        }
    }
    finally {

        lock( _locks ) {
            _locks.Remove( resourceName );
        }
labelC:
    }

}

I initially solved with this my own little hack:

class CountedLock {
    public Int32 Count;
}

private static readonly Dictionary<String,CountedLock> _locks = new Dictionary<String,CountedLock>();

public static void DoSomethingMutuallyExclusiveByName(String resourceName) {

labelA:        
    CountedLock resourceLock;
    lock( _locks ) {

        if( !_locks.TryGetValue( resourceName, out resourceLock ) ) {
            _locks.Add( resourceName, resourceLock = new CountedLock() { Count = 1 } );
        }
        else {
            resourceLock.Count++; // no need for Interlocked.Increment as we're already in a mutex code block
        }
    }

    try {

labelB:
        lock( resourceLock ) {

            EnterCriticalSection( resourceName );
        }
    }
    finally {

        lock( _locks ) {
labelD:
            if( --resourceLock.Count == 0 ) {
                _locks.Remove( resourceName );
            }
        }
labelC:
    }

}

This solves the problem. Assuming the three threads are at the same locations as before:

  • Thread 1 did not remove its resourceName resourceLock value from _locks because Thread 2 also has a reference to the same resourceLock as Thread 1, so the associated count was 2 when Thread 1 was at labelD (and the count then becomes 1 when Thread 1 reaches labelC).
  • Thread 2 continues to enter the critical section because it got there before Thread 3.
  • If Thread 3 (at labelA) gets to TryGetValue while Thread 2 is still in the critical section then will see that its resourceName="foo" is still in the _locks dictionary and so gets the same instance as Thread 2, so it will wait at labelB for Thread 2 to complete.
  • So this works!

But I imagine now you're thinking:

"So you have a lock with an associated count... sounds like you just re-invented semaphores. Don't re-invent the wheel, use System.Threading.Sempahore or SemaphoreSlim.

Indeed, so I changed my code to use a SemaphoreSlim - a SemaphoreSlim instance has an internal Count value which is the number of threads allowed to enter (as opposed to the number of threads currently "inside" the semaphore - which is the opposite to how my CountedLock example worked in the previous example):

private static readonly Dictionary<String,SemaphoreSlim> _locks = new Dictionary<String,SemaphoreSlim>();

public static void DoSomethingMutuallyExclusiveByName(String resourceName) {

labelA:
    SemaphoreSlim resourceLock;
    lock( _locks ) {
        if( !_locks.TryGetValue( resourceName, out resourceLock ) {
            _locks.Add( resourceName, resourceLock = new SemaphoreLock( initialCount: 1, maxCount: 1 ) );
        }
    }

labelB:
    resourceLock.Wait(); // this decrements the semaphore's count

    try {
        EnterCriticalSection( resourceName );
    }
    finally {

        lock( _locks ) {
            Int32 count = resourceLock.Release(); // this increments the sempahore's count
            if( count > 0 ) {
                _locks.Remove( resourceName );
                resourceLock.Dispose();
            }
labelC:
        }
    }
}

...but spot the bug!

Consider this scenario:

  • Thread 1 is at labelC and just removed and disposed its resourceLock (#1) instance.
  • Thread 2 is at labelB (immediately prior to calling Wait). It had acquired a reference to the same SemaphoreSlim resourceLock (#1) instance as Thread 1; but because the Wait method is called after the thread leaves the lock( _locks ) under labelA it means there is a small, but extant, window-of-opportunity where thread 2 will then call resourceLock (#1).Wait() (ignoring the possible ObjectDisposedException) while thread 3 (currently at labelA) will then enter TryGetValue and instantiate a new instance of SemaphoreLock (#2) for the same resourceName, but because thread 2 and thread 3 have difference semaphore instances they will both potentially enter the critical section simultaneously.

You might suggest:

You should just find a way to decrement the semaphore while you're inside the lock( _locks ) block under labelA

...except the SemaphoreSlim class does not expose any Decrement method. You can call .Wait(0) such that it returns immediately, so my code could look like this:

[...]
labelA:
    SemaphoreSlim resourceLock;
    lock( _locks ) {
        if( !_locks.TryGetValue( resourceName, out resourceLock ) {
            _locks.Add( resourceName, resourceLock = new SemaphoreLock( initialCount: 1, maxCount: 1 ) );
        }
        resourceLock.Wait( 0 );
    }

labelB:
    resourceLock.Wait();
[...]

...except this won't work. The documentation for Wait(Int32) states (emphasis mine):

If a thread or task is blocked when calling Wait(Int32), and the time-out interval specified by millisecondsTimeout expires, the thread or task doesn’t enter the semaphore, and the CurrentCount property isn’t decremented.

...so much for that theory. Even if it did work, calling Wait twice in the same thread might perhaps decrement the count twice, not once.

So is it possible to have a critical-section guarded by mutexes that somehow "know" when they're not needed anymore?

like image 916
Dai Avatar asked Sep 18 '16 06:09

Dai


People also ask

How do you know if a semaphore is empty?

You can check to see if a Semaphore is signaled by calling WaitOne and passing a timeout value of 0 as a parameter. This will cause WaitOne to return immediately with a true or false value indicating whether the semaphore was signaled.

Are semaphores shared between threads?

If the value of pshared is nonzero, then the semaphore can be shared between processes. Multiple threads must not initialize the same semaphore. A semaphore must not be reinitialized while other threads might be using the semaphore.

What is a semaphore slim?

SemaphoreSlim(Int32, Int32) Initializes a new instance of the SemaphoreSlim class, specifying the initial and maximum number of requests that can be granted concurrently.


3 Answers

I would actually stick with your simpler, count-based solution, rather than SemaphoreSlim, since you've already implemented it anyway. While called "slim", SemaphoreSlim is nevertheless less lightweight than your simple counter. As you are aware, using the semaphore actually makes the code slightly less performant, and a bit more complicated. If nothing else, if it takes more time to convince oneself that this version actually works, then perhaps it's not the better version.

So, perhaps you are reinventing the wheel, but SemaphoreSlim is a general-purpose semaphore, with functionality you don't fully need. Even Microsoft reinvented the wheel by adding SemaphoreSlim to the BCL, when Semaphore was already there.

On the other hand, if you feel like contention might be an issue with your global lock, you might try using a lockless approach instead. Most likely, you won't have such issues, but if you really think this will be called thousands of times all over your code, you opt for something like:

private static readonly ConcurrentDictionary<string, CountedLock> _locks 
    = new ConcurrentDictionary<string, CountedLock>();

public static void DoSomethingMutuallyExclusiveByName(string resourceName)
{
    CountedLock resourceLock;

    // we must use a loop to avoid incrementing a stale lock object
    var spinWait = new SpinWait();
    while (true)
    {
        resourceLock = _locks.GetOrAdd(resourceName, i => new CountedLock());
        resourceLock.Increment();

        // check that the instance wasn't removed in the meantime
        if (resourceLock == _locks.GetOrAdd(resourceName, i => new CountedLock()))
            break;

        // otherwise retry
        resourceLock.Decrement();
        spinWait.SpinOnce();
    }

    try
    {
        lock (resourceLock)
        {
            // EnterCriticalSection(resourceName);
        }
    }
    finally
    {
        if (resourceLock.Decrement() <= 0)
            _locks.TryRemove(resourceName, out resourceLock);
    }
}

With CountedLock also modified to use the Interlocked class:

class CountedLock
{
    Int32 _count;
    public int Increment() => Interlocked.Increment(ref _count);
    public int Decrement() => Interlocked.Decrement(ref _count);
}

Either way, I would probably reorganize the code to be generic, and to (ab)use the IDisposable interface to allow you to simply wrap the call in a single using block.

like image 53
Groo Avatar answered Oct 17 '22 04:10

Groo


It seems calling Wait( 0 ) is actually the solution. Indeed, it does not decrement the sempahore's count if the sempahore is already blocked, so it's just a matter of calling it again with an infinite timeout after we leave the lock( _locks ) mutex, like so:

public static void DoSomethingMutuallyExclusiveByName(String resourceName) {

    Boolean hasLock;
    SemaphoreSlim resourceLock;
    lock( _locks ) {
        if( !_locks.TryGetValue( resourceName, out resourceLock ) {
            _locks.Add( resourceName, resourceLock = new SemaphoreLock( initialCount: 1, maxCount: 1 ) );
        }
        hasLock = resourceLock.Wait( 0 ); // This call will not block. And this call will decrement the semaphore count only if it succeeds (returns true).
    }

    if( !hasLock ) resourceLock.Wait(); // This will block, but will only be called if the previous call to Wait(0) indicated this thread did not enter the semaphore.

    try {
        EnterCriticalSection( resourceName );
    }
    finally {

        lock( _locks ) {
            Int32 count = resourceLock.Release(); // this increments the sempahore's count
            if( count > 0 ) {
                _locks.Remove( resourceName );
                resourceLock.Dispose();
            }
        }
    }
}

I think this solves the problem - I'm at pain to see a race-condition in this version. The fact that the Wait(0) call is inside the lock( _locks ) mutex means that at least 1 thread for a given resourceName will have the lock immediately after the closing brace of the first lock, so a semaphore instance will never be removed from the _locks collection if either anyone is waiting on it, or is inside the critical section.

like image 29
Dai Avatar answered Oct 17 '22 02:10

Dai


I don't know, sometimes it might be better to reinvent the wheel.

I'm а fan of the Monitor based synchronization constructs (in fact all Slim classes use such constructs in one or another way). The first thing that comes to my mind is something simple like this:

public class CriticalSectionSlim<TKey>
{
    private readonly HashSet<TKey> lockSet = new HashSet<TKey>();
    public void Enter(TKey key)
    {
        lock (lockSet)
        {
            while (!lockSet.Add(key))
                Monitor.Wait(lockSet);
        }
    }
    public void Exit(TKey key)
    {
        lock (lockSet)
        {
            lockSet.Remove(key);
            Monitor.PulseAll(lockSet);
        }
    }
}

The potential performance issue is that (A) there is affinity between the key waiters and (B) the non intelligent pulsing.

A bit more complicated version that solves the (B), thus minimizing the affect of (A) could be as follows:

public class CriticalSectionSlim<TKey>
{
    const int EnteredFlag = int.MinValue;
    private readonly Dictionary<TKey, int> lockInfo = new Dictionary<TKey, int>();
    public void Enter(TKey key)
    {
        lock (lockInfo)
        {
            int info;
            if (lockInfo.TryGetValue(key, out info))
            {
                if ((info & EnteredFlag) != 0)
                {
                    lockInfo[key] = info + 1;
                    do
                    {
                        Monitor.Wait(lockInfo);
                        info = lockInfo[key];
                    }
                    while ((info & EnteredFlag) != 0);
                    info--;
                }
            }
            lockInfo[key] = EnteredFlag | info;
        }
    }
    public void Exit(TKey key)
    {
        lock (lockInfo)
        {
            int waitCount = lockInfo[key] & ~EnteredFlag;
            if (waitCount == 0)
                lockInfo.Remove(key);
            else
            {
                lockInfo[key] = waitCount;
                Monitor.PulseAll(lockInfo);
            }
        }
    }
}

For each key, we keep a flag if it's entered and also the count of waiters if any. This allows us to avoid pulsing on exit if there are no waiters.

like image 2
Ivan Stoev Avatar answered Oct 17 '22 03:10

Ivan Stoev