Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Conditionally lock resources

I have a consumer class that is a Singleton. Is it possible in any way to conditionally lock resources. Currently the external resources is "globally" locked with the AsyncEx (https://github.com/StephenCleary/AsyncEx) framework. But I would prefer to lock it based on the Key in the input message so messages with different Keys should not lock the resource.

public class Consumer
{
    private readonly AsyncLock _mutex = new AsyncLock();

    protected async Task Process(Message msg)
    {
       **UNBLOCKED CODE**

       using (await _mutex.LockAsync())
       {
          await GetData(msg.Key);
       }

       **UNBLOCKED CODE**
    }
}

Any suggestions?

like image 421
Rikard Avatar asked Feb 12 '26 01:02

Rikard


1 Answers

Instead of having just one AsyncLock, create a dictionary full of them. The message key is the key to the AsyncLock in the dictionary. As long as there is just one message with a given key processing at a time, there won't be any blocking. I am further assuming that your message key is a string.

private readonly Dictionary<string, AsyncLock> _mutexes = new Dictionary<string, AsyncLock>();

private AsyncLock GetMutex(string key)
{
    lock (_mutexes)
    {
        AsyncLock mutex;
        if (!_mutexes.TryGetValue(key, out mutex))
        {
            // no mutex yet, create a new one
            mutex = new AsyncLock();
            _mutexes.Add(key, mutex);
        }
        return mutex;
    }
}

....
protected async Task Process(Message msg)
{
    using (await GetMutex(msg.Key).LockAsync())
    {
        ...
    }
}

Please note that in all likelihood the GetMutex method can be further optimized to avoid a lock operation if the mutex for a key already exists, e.g. by first checking with ContainsKey, if not then lock and add. But I'm not sure if that optimization is 100% threadsafe so I did not include it here.

Inspired by Jason's ConcurrentDictionary comments I decided to provide the code for the ConcurrentDictionary version of GetMutex. I do not recommend to attempt what I crossed out above about optimizing access to a non-threadsafe dictionary.

ConcurrentDictionary<string, AsyncLock> _mutexes = new ConcurrentDictionary<string, AsyncLock>();

private AsyncLock GetMutexes(string key)
{
    return _mutexes.GetOrAdd(key, s => { return new AsyncLock(); });
}
like image 120
Christoph Avatar answered Feb 13 '26 17:02

Christoph



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!