Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Iterate over ConcurrentHashMap while deleting entries

I want to periodically iterate over a ConcurrentHashMap while removing entries, like this:

for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
    Entry<Integer, Integer> entry = iter.next();
    // do something
    iter.remove();
}

The problem is that another thread may be updating or modifying values while I'm iterating. If that happens, those updates can be lost forever, because my thread only sees stale values while iterating, but the remove() will delete the live entry.

After some consideration, I came up with this workaround:

map.forEach((key, value) -> {
    // delete if value is up to date, otherwise leave for next round
    if (map.remove(key, value)) {
        // do something
    }
});

One problem with this is that it won't catch modifications to mutable values that don't implement equals() (such as AtomicInteger). Is there a better way to safely remove with concurrent modifications?

like image 255
shmosel Avatar asked May 10 '16 00:05

shmosel


2 Answers

Your workaround works but there is one potential scenario. If certain entries have constant updates map.remove(key,value) may never return true until updates are over.

If you use JDK8 here is my solution

for (Iterator<Entry<Integer, Integer>> iter = map.entrySet().iterator(); iter.hasNext(); ) {
    Entry<Integer, Integer> entry = iter.next();
    Map.compute(entry.getKey(), (k, v) -> f(v));
    //do something for prevValue
}
....
private Integer prevValue;

private Integer f(Integer v){
    prevValue = v;
    return null;
}

compute() will apply f(v) to the value and in our case assign the value to the global variable and remove the entry.

According to Javadoc it is atomic.

Attempts to compute a mapping for the specified key and its current mapped value (or null if there is no current mapping). The entire method invocation is performed atomically. Some attempted update operations on this map by other threads may be blocked while computation is in progress, so the computation should be short and simple, and must not attempt to update any other mappings of this Map.

like image 169
xz2145 Avatar answered Nov 17 '22 09:11

xz2145


Your workaround is actually pretty good. There are other facilities on top of which you can build a somewhat similar solution (e.g. using computeIfPresent() and tombstone values), but they have their own caveats and I have used them in slightly different use-cases.

As for using a type that doesn't implement equals() for the map values, you can use your own wrapper on top of the corresponding type. That's the most straightforward way to inject custom semantics for object equality into the atomic replace/remove operations provided by ConcurrentMap.

Update

Here's a sketch that shows how you can build on top of the ConcurrentMap.remove(Object key, Object value) API:

  • Define a wrapper type on top of the mutable type you use for the values, also defining your custom equals() method building on top of the current mutable value.
  • In your BiConsumer (the lambda you're passing to forEach), create a deep copy of the value (which is of type your new wrapper type) and perform your logic determining whether the value needs to be removed on the copy.
  • If the value needs to be removed, call remove(myKey, myValueCopy).
    • If there have been some concurrent changes while you were calculating whether the value needs to be removed, remove(myKey, myValueCopy) will return false (barring ABA problems, which are a separate topic).

Here's some code illustrating this:

import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

public class Playground {

   private static class AtomicIntegerWrapper {
      private final AtomicInteger value;

      AtomicIntegerWrapper(int value) {
         this.value = new AtomicInteger(value);
      }

      public void set(int value) {
         this.value.set(value);
      }

      public int get() {
         return this.value.get();
      }

      @Override
      public boolean equals(Object obj) {
         if (this == obj) {
            return true;
         }
         if (!(obj instanceof AtomicIntegerWrapper)) {
            return false;
         }
         AtomicIntegerWrapper other = (AtomicIntegerWrapper) obj;
         if (other.value.get() == this.value.get()) {
            return true;
         }
         return false;
      }

      public static AtomicIntegerWrapper deepCopy(AtomicIntegerWrapper wrapper) {
         int wrapped = wrapper.get();
         return new AtomicIntegerWrapper(wrapped);
      }
   }

   private static final ConcurrentMap<Integer, AtomicIntegerWrapper> MAP
         = new ConcurrentHashMap<>();

   private static final int NUM_THREADS = 3;

   public static void main(String[] args) throws InterruptedException {
      for (int i = 0; i < 10; ++i) {
         MAP.put(i, new AtomicIntegerWrapper(1));
      }

      Thread.sleep(1);

      for (int i = 0; i < NUM_THREADS; ++i) {
         new Thread(() -> {
            Random rnd = new Random();
            while (!MAP.isEmpty()) {
               MAP.forEach((key, value) -> {
                  AtomicIntegerWrapper elem = MAP.get(key);
                  if (elem == null) {
                     System.out.println("Oops...");
                  } else if (elem.get() == 1986) {
                     elem.set(1);
                  } else if ((rnd.nextInt() & 128) == 0) {
                     elem.set(1986);
                  }
               });
            }
         }).start();
      }

      Thread.sleep(1);

      new Thread(() -> {
         Random rnd = new Random();
         while (!MAP.isEmpty()) {
            MAP.forEach((key, value) -> {
               AtomicIntegerWrapper elem =
                     AtomicIntegerWrapper.deepCopy(MAP.get(key));
               if (elem.get() == 1986) {
                  try {
                     Thread.sleep(10);
                  } catch (Exception e) {}
                  boolean replaced = MAP.remove(key, elem);
                  if (!replaced) {
                     System.out.println("Bailed out!");
                  } else {
                     System.out.println("Replaced!");
                  }
               }
            });
         }
      }).start();
   }
}

You'll see printouts of "Bailed out!", intermixed with "Replaced!" (removal was successful, as there were no concurrent updates that you care about) and the calculation will stop at some point.

  • If you remove the custom equals() method and continue to use a copy, you'll see an endless stream of "Bailed out!", because the copy is never considered equal to the value in the map.
  • If you don't use a copy, you won't see "Bailed out!" printed out, and you'll hit the problem you're explaining - values are removed regardless of concurrent changes.
like image 43
Dimitar Dimitrov Avatar answered Nov 17 '22 10:11

Dimitar Dimitrov