I have the following code, it is a toy code but makes possible to reproduce the problem:
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toList;
public class TestClass3 {
public static void main(String[] args) throws InterruptedException {
// Setup data that we will be playing with concurrently
List<String> keys = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
HashMap<String, List<Integer>> keyValueMap = new HashMap<>();
for (String key : keys) {
int[] randomInts = new Random().ints(10000, 0, 10000).toArray();
keyValueMap.put(key, stream(randomInts).boxed().collect(toList()));
}
// Entering danger zone, concurrently transforming our data to another shape
ExecutorService es = Executors.newFixedThreadPool(10);
Map<Integer, Set<String>> valueKeyMap = new ConcurrentHashMap<>();
for (String key : keys) {
es.submit(() -> {
for (Integer value : keyValueMap.get(key)) {
valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);
}
});
}
// Wait for all tasks in executorservice to finish
es.shutdown();
es.awaitTermination(1, TimeUnit.MINUTES);
// Danger zone ends..
// We should be in a single-thread environment now and safe
StringBuilder stringBuilder = new StringBuilder();
for (Integer integer : valueKeyMap.keySet()) {
String collect = valueKeyMap
.get(integer)
.stream()
.sorted() // This will blow randomly
.collect(Collectors.joining());
stringBuilder.append(collect); // just to print something..
}
System.out.println(stringBuilder.length());
}
}
When I run this code over and over again, it will usually run without any exceptions and will print some number.. However from time time (1 out of 10 tries approximately) I will get an exception akin to:
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 6
at java.util.stream.SortedOps$SizedRefSortingSink.accept(SortedOps.java:369)
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at biz.tugay.TestClass3.main(TestClass3.java:40)
I am pretty certain it has something to do with
valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);
If I change this part as follows, I never get an exception:
synchronized (valueKeyMap) {
valueKeyMap.computeIfAbsent(value, val -> new HashSet<>()).add(key);
}
I am thinking computeIfAbsent
is still modifying the valueKeyMap
even after all threads are finished.
Could someone explain how come this code is failing randomly, what the reason is? Or is there a totally different reason I am unable to see perhaps and I am wrong in my assumption that computeIfAbsent
is to blame?
In ConcurrentHashMap, at a time any number of threads can perform retrieval operation but for updated in the object, the thread must lock the particular segment in which the thread wants to operate. This type of locking mechanism is known as Segment locking or bucket locking.
ConcurrentHashMap does not throw ConcurrentModificationException if the underlying collection is modified during an iteration is in progress. Iterators may not reflect the exact state of the collection if it is being modified concurrently.
The 'computeIfAbsent(..)' function essentially has 3 steps: (1) see that the value is missing, (2) compute the Function, and (3) store the value in the map. Since this is atomic, there cannot be an interleaving of two threads where A does step 1, then B does 1, then both do steps 2 and then 3.
Having two threads that change the map at the very same point time is not possible. Because the code within that ConcurrentHashMap will not allow two threads to manipulate things in parallel!
The problem isn't in the computeIfAbsent
call, but rather in the .add(key)
at the end: you can have multiple threads trying to add elements to the same HashSet, with nothing to ensure safe concurrent access. Since HashSet isn't threadsafe, this doesn't work properly, and the HashSet sometimes ends up in a corrupt state. Later, when you try to iterate over the HashSet to get a string, it blows up due to this corrupt state. (Judging from your exception, the HashSet thinks its backing array is longer than it actually is, so it's trying to access out-of-bounds array elements.)
Even in the runs where you don't get an exception, you probably sometimes end up "dropping" elements that should have gotten added, but where concurrent updates mean that some updates were lost.
ConcurrentHashMap.computeIfAbsent
executes atomically, that is, only one thread can access the value associated with a given key at a time.
However, there is no such guarantee once the value is returned. The HashSet
can be accessed by multiple writing threads, and as such is not being accessed thread-safely.
Instead, you can do something like this:
valueKeyMap.compute(value, (k, v) -> {
if (v == null) {
v = new HashSet<>();
}
v.add(key);
return v;
});
which works because compute
is atomic too.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With