Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kotlin concurrency for ConcurrentHashMap

I am trying to support concurrency on a hashmap that gets periodically cleared. I have a cache that stores data for a period of time. After every 5 minutes, the data in this cache is sent to the server. Once I flush, I want to clear the cache. The problem is when I am flushing, data could potentially be written to this map while I am doing that with an existing key. How would I go about making this process thread safe?

data class A(val a: AtomicLong, val b: AtomicLong) {
   fun changeA() {
      a.incrementAndGet()
   }
}

class Flusher {
   private val cache: Map<String, A> = ConcurrentHashMap()
   private val lock = Any()
   fun retrieveA(key: String){
       synchronized(lock) {
          return cache.getOrPut(key) { A(key, 1) }
       }
   }
 
   fun flush() {
      synchronized(lock) {
           // send data to network request
           cache.clear()
      }
   }
}

// Existence of multiple classes like CacheChanger
class CacheChanger{
  fun incrementData(){
      flusher.retrieveA("x").changeA()
  }
}

I am worried that the above cache is not properly synchronized. Are there better/right ways to lock this cache so that I don't lose out on data? Should I create a deepcopy of cache and clear it?

Since the above data could be being changed by another changer, could that not lead to problems?

like image 777
LateNightDev Avatar asked Jun 21 '20 21:06

LateNightDev


People also ask

What is concurrency in Kotlin?

Concurrent code in Kotlin is as readable as sequential code. One of the many problems with concurrency in other languages like Java is that often it’s difficult to read, understand, and/or debug concurrent code. Kotlin’s approach allows for idiomatic concurrent code:

What is ConcurrentHashMap in JDK?

ConcurrentHashMap was introduced in JDK 1.5 as an enhancement of HashMap that supports high concurrency for retrievals as well as updates. HashMap isn't thread-safe, so it might lead to incorrect results during thread contention. The ConcurrentHashMap class is thread-safe.

What is the concurrency level of ConcurrentHashMap?

At a time any number of threads are applicable for a read operation without locking the ConcurrentHashMap object which is not there in HashMap. In ConcurrentHashMap, the Object is divided into a number of segments according to the concurrency level. The default concurrency-level of ConcurrentHashMap is 16.

How will Kotlin/native's concurrency rules affect day to day development?

Kotlin/Native's concurrency rules will require some adjustment in architecture design, but with the help of libraries and new best practices, day to day development is basically unaffected. In fact, adhering to Kotlin/Native's rules regarding multiplatform code will result in safer concurrency across the cross-platform mobile application.


1 Answers

You can get rid of the lock.

In the flush method, instead of reading the entire map (e.g. through an iterator) and then clearing it, remove each element one by one.

I'm not sure if you can use iterator's remove method (I'll check that in a moment), but you can take the keyset iterate over it, and for each key invoke cache.remove() - this will give you the value stored and remove it from the cache atomically.

The tricky part is how to make sure that the object of class A won't be modified just prior sending over network... You can do it as follows:

When you get the some x through retrieveA and modify the object, you need to make sure it is still in the cache. Simply invoke retrieve one more time. If you get exactly the same object it's fine. If it's different, then it means that object was removed and sent over network, but you don't know if the modification was also sent, or the state of the object prior to the modification was sent. Still, I think in your case, you can simply repeat the whole process (apply change and check if objects are the same). But it depends on the specifics of your application.

If you don't want to increment twice, then when sending the data over network, you'll have to read the content of the counter a, store it in some local variable and decrease a by that amount (usually it will get zero). Then in the CacheChanger, when you get a different object from the second retrieve, you can check if the value is zero (your modification was taken into account), or non-zero which means your modification came just a fraction of second to late, and you'll have to repeat the process.

You could also replace incrementAndGet with compareAndSwap, but this could yield slightly worse performance. In this approach, instead of incrementing, you try to swap a value that is greater by one. And before sending over network you try to swap the value to -1 to denote the value as invalid. If the second swap fails it means that someone has changed the value concurrently, you need to check it one more time in order to send the freshest value over network, and you repeat the process in a loop (breaking the loop only once the swap to -1 succeeds). In the case of swap to greater by one, you also repeat the process in a loop until the swap succeeds. If it fails, it either means that somebody else swapped to some greater value, or the Flusher swapped to -1. In the latter case you know that you have to call retrieveA one more time to get a new object.

like image 136
ciamej Avatar answered Oct 17 '22 00:10

ciamej