Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does computeIfAbsent fail ConcurrentHashMap randomly?

I have the following code, it is a toy code but makes possible to reproduce the problem:

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toList;

public class TestClass3 {
    public static void main(String[] args) throws InterruptedException {
        // Setup data that we will be playing with concurrently
        List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");

        HashMap<String, List<Integer>> keyValueMap = new HashMap<>();
        for (String key : keys) {
            int[] randomInts = new Random().ints(10000, 0, 10000).toArray();
            keyValueMap.put(key, stream(randomInts).boxed().collect(toList()));
        }

        // Entering danger zone, concurrently transforming our data to another shape
        ExecutorService es = Executors.newFixedThreadPool(10);
        Map<Integer, Set<String>> valueKeyMap = new ConcurrentHashMap<>();
        for (String key : keys) {
            es.submit(() -> {
                for (Integer value : keyValueMap.get(key)) {
                    valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);
                }
            });
        }
        // Wait for all tasks in executorservice to finish
        es.shutdown();
        es.awaitTermination(1, TimeUnit.MINUTES);
        // Danger zone ends..

        // We should be in a single-thread environment now and safe
        StringBuilder stringBuilder = new StringBuilder();
        for (Integer integer : valueKeyMap.keySet()) {
            String collect = valueKeyMap
                    .get(integer)
                    .stream()
                    .sorted()  // This will blow randomly
                    .collect(Collectors.joining());
            stringBuilder.append(collect);  // just to print something..
        }
        System.out.println(stringBuilder.length());
    }
}

When I run this code over and over again, it will usually run without any exceptions and will print some number.. However from time time (1 out of 10 tries approximately) I will get an exception akin to:

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 6
    at java.util.stream.SortedOps$SizedRefSortingSink.accept(SortedOps.java:369)
    at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
    at biz.tugay.TestClass3.main(TestClass3.java:40)

I am pretty certain it has something to do with

valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);

If I change this part as follows, I never get an exception:

synchronized (valueKeyMap) {
    valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);
}

I am thinking computeIfAbsent is still modifying the valueKeyMap even after all threads are finished.

Could someone explain how come this code is failing randomly, what the reason is? Or is there a totally different reason I am unable to see perhaps and I am wrong in my assumption that computeIfAbsent is to blame?

like image 658
Koray Tugay Avatar asked May 10 '20 20:05

Koray Tugay


People also ask

How does locking happen in ConcurrentHashMap?

In ConcurrentHashMap, at a time any number of threads can perform retrieval operation but for updated in the object, the thread must lock the particular segment in which the thread wants to operate. This type of locking mechanism is known as Segment locking or bucket locking.

Can ConcurrentHashMap can throw ConcurrentModificationException?

ConcurrentHashMap does not throw ConcurrentModificationException if the underlying collection is modified during an iteration is in progress. Iterators may not reflect the exact state of the collection if it is being modified concurrently.

Is computeIfAbsent Atomic?

The 'computeIfAbsent(..)' function essentially has 3 steps: (1) see that the value is missing, (2) compute the Function, and (3) store the value in the map. Since this is atomic, there cannot be an interleaving of two threads where A does step 1, then B does 1, then both do steps 2 and then 3.

Can two threads update the ConcurrentHashMap simultaneously?

Having two threads that change the map at the very same point time is not possible. Because the code within that ConcurrentHashMap will not allow two threads to manipulate things in parallel!


2 Answers

The problem isn't in the computeIfAbsent call, but rather in the .add(key) at the end: you can have multiple threads trying to add elements to the same HashSet, with nothing to ensure safe concurrent access. Since HashSet isn't threadsafe, this doesn't work properly, and the HashSet sometimes ends up in a corrupt state. Later, when you try to iterate over the HashSet to get a string, it blows up due to this corrupt state. (Judging from your exception, the HashSet thinks its backing array is longer than it actually is, so it's trying to access out-of-bounds array elements.)

Even in the runs where you don't get an exception, you probably sometimes end up "dropping" elements that should have gotten added, but where concurrent updates mean that some updates were lost.

like image 134
ruakh Avatar answered Sep 22 '22 09:09

ruakh


ConcurrentHashMap.computeIfAbsent executes atomically, that is, only one thread can access the value associated with a given key at a time.

However, there is no such guarantee once the value is returned. The HashSet can be accessed by multiple writing threads, and as such is not being accessed thread-safely.

Instead, you can do something like this:

valueKeyMap.compute(value, (k, v) -> {
    if (v == null) {
      v = new HashSet<>();
    }
    v.add(key);
    return v;
});

which works because compute is atomic too.

like image 32
Andy Turner Avatar answered Sep 25 '22 09:09

Andy Turner