Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Lock handler for arbitrary keys

I have code which implements a "lock handler" for arbitrary keys. Given a key, it ensures that only one thread at a time can process that(or equals) key (which here means calling the externalSystem.process(key) call).

So far, I have code like this:

public class MyHandler {
    private final SomeWorkExecutor someWorkExecutor;
    private final ConcurrentHashMap<Key, Lock> lockMap = new ConcurrentHashMap<>();

    public void handle(Key key) {
        // This can lead to OOM as it creates locks without removing them
        Lock keyLock = lockMap.computeIfAbsent( 
            key, (k) -> new ReentrantLock()
        );
        keyLock.lock();
        try {
            someWorkExecutor.process(key);
        } finally {
            keyLock.unlock();
        }
    }
}

I understand that this code can lead to the OutOfMemoryError because no one clear map.

I think about how to make map which will accumulate limited count of elements. When limit will be exceeded then we should replace oldest access element with new(this code should synchronized with oldest element as monitor). But I don't know how to have callback which will say me that limit exceeded.

Please share your thoughts.

P.S.

I reread the task and now I see that I have limitation that handle method cannot be invoked more than 8 threads. I don't know how can it help me but I just mentioned it.

P.S.2

by @Boris the Spider was suggested nice and simple solution:

} finally {
      lockMap.remove(key);
      keyLock.unlock();
}

But after Boris noticed that code us not thread safe because it break behavior:
lets research 3 threads invoked with equally key:

  1. Thread#1 acquire the lock and now before map.remove(key);
  2. Thread#2 invokes with equals key so it wait when thread#1 release lock.
  3. then thread#1 execute map.remove(key);. After this thread#3 invokes method handle. It checks that lock for this key is absent in map thus it creates new lock and acquires it.
  4. Thread#1 releases the lock and thus thread#2 acquires it.
    Thus thread#2 and thread#3 can be invoked in parallel for equals keys. But it should not be allowed.

To avoid this situation, before map clearing we should block any thread to acquire the lock while all threads from waitset is not acquire and release the lock. Looks like it is enough complicated synchronization needed and it will lead to slow algorithm working. Maybe we should clear map from time to time when map size exceeds some limited value.

I wasted a lot of time but unfortunately I have not ideas how to achieve this.

like image 737
gstackoverflow Avatar asked Jan 27 '17 16:01

gstackoverflow


2 Answers

You don't need to try to limit the size to some arbitrary value - as it turns out, you can accomplish this kind of "lock handler" idiom while only storing exactly the number of keys currently locked in the map.

The idea is to use a simple convention: successfully adding the mapping to the map counts as the "lock" operation, and removing it counts as the "unlock" operation. This neatly avoids the issue of removing a mapping while some thread still has it locked and other race conditions.

At this point, the value in the mapping is only used to block other threads who arrive with the same key and need to wait until the mapping is removed.

Here's an example1 with CountDownLatch rather than Lock as the map value:

public void handle(Key key) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    // try to acquire the lock by inserting our latch as a
    // mapping for key        
    while(true) {
        CountDownLatch existing = lockMap.putIfAbsent(key, latch);
        if (existing != null) {
            // there is an existing key, wait on it
            existing.await();
        } else {
            break;
        }
    }

    try {
        externalSystem.process(key);
    } finally {
        lockMap.remove(key);
        latch.countDown();
    }
}

Here, the lifetime of the mapping is only as long as the lock is held. The map will never have more entries than there are concurrent requests for different keys.

The difference with your approach is that the mappings are not "re-used" - each handle call will create a new latch and mapping. Since you are already doing expensive atomic operations, this isn't likely to be much of a slowdown in practice. Another downside is that with many waiting threads, all are woken when the latch counts down, but only one will succeed in putting a new mapping in and hence acquiring the lock - the rest go back to sleep on the new lock.

You could build another version of this which re-uses the mappings when threads coming along and wait on an existing mapping. Basically, the unlocking thread just does a "handoff" to one of the waiting threads. Only one mapping will be used for an entire set of threads that wait on the same key - it is handed off to each one in sequence. The size is still bounded because one no more threads are waiting on a given mapping it is still removed.

To implement that, you replace the CountDownLatch with a map value that can count the number of waiting threads. When a thread does the unlock, it first checks to see if any threads are waiting, and if so wakes one to do the handoff. If no threads are waiting, it "destroys" the object (i.e., sets a flag that the object is no longer in the mapping) and removes it from the map.

You need to do the above manipulations under a proper lock, and there are a few tricky details. In practice I find the short and sweet example above works great.


1 Written on the fly, not compiled and not tested, but the idea works.

like image 105
BeeOnRope Avatar answered Oct 01 '22 18:10

BeeOnRope


You could rely on the method compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) to synchronize calls to your method process for a given key, you don't even need anymore to use Lock as type of the values of your map as you don't rely on it anymore.

The idea is to rely on the internal locking mechanism of your ConcurrentHashMap to execute your method, this will allow threads to execute in parallel the process method for keys whose corresponding hashes are not part of the same bin. This equivalent to the approach based on striped locks except that you don't need additional third party library.

The striped locks' approach is interesting because it is very light in term of memory footprint as you only need a limited amount of locks to do it, so the memory footprint needed for your locks is known and never changes, which is not the case of approaches that use one lock for each key (like in your question) such that it is generally better/recommended to use approaches based on striped locks for such need.

So your code could be something like this:

// This will create a ConcurrentHashMap with an initial table size of 16   
// bins by default, you may provide an initialCapacity and loadFactor
// if too much or not enough to get the expected table size in order
// increase or reduce the concurrency level of your map
// NB: We don't care much of the type of the value so I arbitrarily
// used Void but it could be any type like simply Object
private final ConcurrentMap<Key, Void> lockMap = new ConcurrentHashMap<>();

public void handle(Key lockKey) {
    // Execute the method process through the remapping Function
    lockMap.compute(
        lockKey,
        (key, value) -> {
            // Execute the process method under the protection of the
            // lock of the bin of hashes corresponding to the key
            someWorkExecutor.process(key);
            // Returns null to keep the Map empty
            return null;
        }
    );
}

NB 1: As we always returns null the map will always be empty such that you will never run out of memory because of this map.

NB 2: As we never affect a value to a given key, please note that it could also be done using the method computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction):

public void handle(Key lockKey) {
    // Execute the method process through the remapping Function
    lockMap.computeIfAbsent(
        lockKey,
        key -> {
            // Execute the process method under the protection of the
            // lock of the segment of hashes corresponding to the key
            someWorkExecutor.process(key);
            // Returns null to keep the Map empty
            return null;
        }
    );
}

NB 3: Make sure that your method process never calls the method handle for any keys as you would end up with infinite loops (same key) or deadlocks (other non ordered keys, for example: If one thread calls handle(key1) and then process internally calls handle(key2) and another thread calls in parallel handle(key2) and then process internally calls handle(key1), you will get a deadlock whatever the approach used). This behavior is not specific to this approach, it will occur with any approaches.

like image 20
Nicolas Filotto Avatar answered Oct 01 '22 18:10

Nicolas Filotto