Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Missing updates with locks and ConcurrentHashMap

I have a scenario where I have to maintain a Map which can be populated by multiple threads, each modifying their respective List (unique identifier/key being the thread name), and when the list size for a thread exceeds a fixed batch size, we have to persist the records to the database.

Aggregator class

private volatile ConcurrentHashMap<String, List<T>>  instrumentMap = new ConcurrentHashMap<String, List<T>>();
private ReentrantLock lock ;

public void addAll(List<T> entityList, String threadName) {
    try {
        lock.lock();
        List<T> instrumentList = instrumentMap.get(threadName);
        if(instrumentList == null) {
            instrumentList = new ArrayList<T>(batchSize);
            instrumentMap.put(threadName, instrumentList);
        }

        if(instrumentList.size() >= batchSize -1){
            instrumentList.addAll(entityList);
            recordSaver.persist(instrumentList); 
            instrumentList.clear();
        } else {
            instrumentList.addAll(entityList);  
        }
    } finally {
        lock.unlock();
    }

}

There is one more separate thread running after every 2 minutes (using the same lock) to persist all the records in Map (to make sure we have something persisted after every 2 minutes and the map size does not gets too big)

if(//Some condition) {
    Thread.sleep(//2 minutes);
    aggregator.getLock().lock();
    List<T> instrumentList = instrumentMap.values().stream().flatMap(x->x.stream()).collect(Collectors.toList());
    if(instrumentList.size() > 0) {
        saver.persist(instrumentList);
        instrumentMap .values().parallelStream().forEach(x -> x.clear());
        aggregator.getLock().unlock();
    }
}

This solution is working fine in almost for every scenario that we tested, except sometimes we see some of the records went missing, i.e. they are not persisted at all, although they were added fine to the Map.

My questions are:

  1. What is the problem with this code?
  2. Is ConcurrentHashMap not the best solution here?
  3. Does the List that is used with the ConcurrentHashMap have an issue?
  4. Should I use the compute method of ConcurrentHashMap here (no need I think, as ReentrantLock is already doing the same job)?
like image 976
Amit Avatar asked Feb 13 '19 05:02

Amit


1 Answers

The answer provided by @Slaw in the comments did the trick. We were letting the instrumentList instance escape in non-synchronized way i.e. access/operations are happening over list without any synchonization. Fixing the same by passing the copy to further methods did the trick.

Following line of code is the one where this issue was happening

recordSaver.persist(instrumentList); instrumentList.clear();

Here we are allowing the instrumentList instance to escape in non-synchronized way i.e. it is passed to another class (recordSaver.persist) where it was to be actioned on but we are also clearing the list in very next line(in Aggregator class) and all of this is happening in non-synchronized way. List state can't be predicted in record saver... a really stupid mistake.

We fixed the issue by passing a cloned copy of instrumentList to recordSaver.persist(...) method. In this way instrumentList.clear() has no affect on list available in recordSaver for further operations.

like image 116
Amit Avatar answered Oct 05 '22 11:10

Amit