I want to periodically iterate over a ConcurrentHashMap
while removing entries, like this:
for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
Entry<Integer, Integer> entry = iter.next();
// do something
iter.remove();
}
The problem is that another thread may be updating or modifying values while I'm iterating. If that happens, those updates can be lost forever, because my thread only sees stale values while iterating, but the remove()
will delete the live entry.
After some consideration, I came up with this workaround:
map.forEach((key, value) -> {
// delete if value is up to date, otherwise leave for next round
if (map.remove(key, value)) {
// do something
}
});
One problem with this is that it won't catch modifications to mutable values that don't implement equals()
(such as AtomicInteger
). Is there a better way to safely remove with concurrent modifications?
Your workaround works but there is one potential scenario. If certain entries have constant updates map.remove(key,value) may never return true until updates are over.
If you use JDK8 here is my solution
for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
Entry<Integer, Integer> entry = iter.next();
Map.compute(entry.getKey(), (k, v) -> f(v));
//do something for prevValue
}
....
private Integer prevValue;
private Integer f(Integer v){
prevValue = v;
return null;
}
compute() will apply f(v) to the value and in our case assign the value to the global variable and remove the entry.
According to Javadoc it is atomic.
Attempts to compute a mapping for the specified key and its current mapped value (or null if there is no current mapping). The entire method invocation is performed atomically. Some attempted update operations on this map by other threads may be blocked while computation is in progress, so the computation should be short and simple, and must not attempt to update any other mappings of this Map.
Your workaround is actually pretty good. There are other facilities on top of which you can build a somewhat similar solution (e.g. using computeIfPresent()
and tombstone values), but they have their own caveats and I have used them in slightly different use-cases.
As for using a type that doesn't implement equals()
for the map values, you can use your own wrapper on top of the corresponding type. That's the most straightforward way to inject custom semantics for object equality into the atomic replace/remove operations provided by ConcurrentMap
.
Update
Here's a sketch that shows how you can build on top of the ConcurrentMap.remove(Object key, Object value)
API:
equals()
method building on top of the current mutable value.BiConsumer
(the lambda you're passing to forEach
), create a deep copy of the value (which is of type your new wrapper type) and perform your logic determining whether the value needs to be removed on the copy.remove(myKey, myValueCopy)
.
remove(myKey, myValueCopy)
will return false
(barring ABA problems, which are a separate topic).Here's some code illustrating this:
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
public class Playground {
private static class AtomicIntegerWrapper {
private final AtomicInteger value;
AtomicIntegerWrapper(int value) {
this.value = new AtomicInteger(value);
}
public void set(int value) {
this.value.set(value);
}
public int get() {
return this.value.get();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof AtomicIntegerWrapper)) {
return false;
}
AtomicIntegerWrapper other = (AtomicIntegerWrapper) obj;
if (other.value.get() == this.value.get()) {
return true;
}
return false;
}
public static AtomicIntegerWrapper deepCopy(AtomicIntegerWrapper wrapper) {
int wrapped = wrapper.get();
return new AtomicIntegerWrapper(wrapped);
}
}
private static final ConcurrentMap<Integer, AtomicIntegerWrapper> MAP
= new ConcurrentHashMap<>();
private static final int NUM_THREADS = 3;
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; ++i) {
MAP.put(i, new AtomicIntegerWrapper(1));
}
Thread.sleep(1);
for (int i = 0; i < NUM_THREADS; ++i) {
new Thread(() -> {
Random rnd = new Random();
while (!MAP.isEmpty()) {
MAP.forEach((key, value) -> {
AtomicIntegerWrapper elem = MAP.get(key);
if (elem == null) {
System.out.println("Oops...");
} else if (elem.get() == 1986) {
elem.set(1);
} else if ((rnd.nextInt() & 128) == 0) {
elem.set(1986);
}
});
}
}).start();
}
Thread.sleep(1);
new Thread(() -> {
Random rnd = new Random();
while (!MAP.isEmpty()) {
MAP.forEach((key, value) -> {
AtomicIntegerWrapper elem =
AtomicIntegerWrapper.deepCopy(MAP.get(key));
if (elem.get() == 1986) {
try {
Thread.sleep(10);
} catch (Exception e) {}
boolean replaced = MAP.remove(key, elem);
if (!replaced) {
System.out.println("Bailed out!");
} else {
System.out.println("Replaced!");
}
}
});
}
}).start();
}
}
You'll see printouts of "Bailed out!", intermixed with "Replaced!" (removal was successful, as there were no concurrent updates that you care about) and the calculation will stop at some point.
equals()
method and continue to use a copy, you'll see an endless stream of "Bailed out!", because the copy is never considered equal to the value in the map.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With