Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrent HashMap weak consistency missing past keys during iteration - multi thread pub/sub

Noticing weird behavior with ConcurrentHashMap iteration. Scenario:

  • Single publisher thread constantly adds new keys to the concurrent hashmap. Note keys are added in sequential order.
  • Multiple subscriber thread tries to read all present values in the concurrent hashmap.

Many subscribers seem to be missing keys which were added much earlier. Example, when a subscriber reads the map with size 100. It is expected that the subscriber will see atleast keys from 0 to 99. However, it seems it does not return all the keys. Below is the sample code that reproduces the issue. In other words, even after example key 100 is added in the map, on random reads, key 100 appear to be missing.

In terms of explanation, I am thinking that when a key is being resolved by chaining and a read all requests comes in, then the collision related keys are not returned. However, I am looking if anyone else has seen this and can provide a better explanation for the below behavior.

Below code has 3 variants:

  1. one where the problem shows up (test())
  2. one where if the map is kept big enough and collisions ~"do not occur", then complete view is received on random basis (testWithNoCollisions).
  3. one where locks are used to ensure when a subscriber is reading the whole map, then the writers are blocked (testWithLocks).
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ConcurrentHashMapWeakConsistency {

    public static void main(String[] args) throws InterruptedException {
        test();
//        testWithNoCollisions();// Sometimes it passes when there is no collision
//        testWithLocks();// This one works but it has external synchronize mechanism.
    }

    public static void test() throws InterruptedException {
        var map = new ConcurrentHashMap<String, Integer>();

        var publisher = Executors.newSingleThreadExecutor();
        var totalMessages = 1_000_000;
        publisher.submit(() -> {
            for (int i = 0; i < totalMessages; i++) {
                var key = "key-" + i;
                map.put(key, i);
                if (i % 10000 == 0) {
                    System.out.printf("Published %d messages.\n", i);
                }
            }
            System.out.printf("Published all %d messages\n", totalMessages);
        });

        var subscriberCount = 100;
        var subscribers = Executors.newFixedThreadPool(10);
        var subsLatch = new CountDownLatch(subscriberCount);

        IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
            try {
                var existingKeys = new HashSet<>(map.keySet());
                var size = existingKeys.size();

                //Note the keys are inserted by publisher in sequential order.
                // Hence, existing keys values should have all keys from range 0 to size-1
                // This is where weak consistency shows up.
                var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
                        .sorted()
                        .boxed()
                        .toList();

                if (!missingKeys.isEmpty()) {
                    var sortedExistingKeys = existingKeys.stream()
                            .sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
                            .toList();
                    var start = sortedExistingKeys.get(0);
                    var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
                    var missingStart = "key-" + missingKeys.get(0);
                    var missingEnd = "key-" + missingKeys.get(missingKeys.size() - 1);
                    throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
                }
            } catch (Exception ex) {
                ex.printStackTrace();
                System.exit(-1);
            } finally {
                subsLatch.countDown();
                System.out.printf("Subscriber %d finished reading from map.\n", subscriberId);
            }
        }));


        subsLatch.await();
        publisher.shutdownNow();
        subscribers.shutdownNow();
    }

    public static void testWithNoCollisions() throws InterruptedException {
        var map = new ConcurrentHashMap<String, Integer>(1_000_000, .25f);

        var publisher = Executors.newSingleThreadExecutor();
        var totalMessages = 10_000;
        publisher.submit(() -> {
            for (int i = 0; i < totalMessages; i++) {
                var key = "key-" + i;
                map.put(key, i);
                if (i % 10000 == 0) {
                    System.out.printf("Published %d messages.\n", i);
                }
            }
            System.out.printf("Published all %d messages\n", totalMessages);
        });

        var subscriberCount = 100;
        var subscribers = Executors.newFixedThreadPool(10);
        var subsLatch = new CountDownLatch(subscriberCount);

        IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
            try {
                var existingKeys = new HashSet<>(map.keySet());
                var size = existingKeys.size();

                //Note the keys are inserted by publisher in sequential order.
                // Hence, existing keys values should have all keys from range 0 to size-1
                // This is where weak consistency shows up.
                var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
                        .sorted()
                        .boxed()
                        .toList();

                if (!missingKeys.isEmpty()) {
                    var sortedExistingKeys = existingKeys.stream()
                            .sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
                            .toList();
                    var start = sortedExistingKeys.get(0);
                    var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
                    var missingStart = missingKeys.get(0);
                    var missingEnd = missingKeys.get(missingKeys.size() - 1);
                    throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
                }
            } catch (Exception ex) {
                ex.printStackTrace();
                System.exit(-1);
            } finally {
                subsLatch.countDown();
                System.out.printf("Subscriber %d finished reading from map.\n", subscriberId);
            }
        }));


        subsLatch.await();
        publisher.shutdownNow();
        subscribers.shutdownNow();
    }

    public static void testWithLocks() throws InterruptedException {
        var map = new ConcurrentHashMap<String, Integer>();

        var publisher = Executors.newSingleThreadExecutor();
        var totalMessages = 1_000_000;
        var subscriberActive = new AtomicBoolean(false);
        publisher.submit(() -> {
            for (int i = 0; i < totalMessages; i++) {
                var key = "key-" + i;
                // get the subscriber lock
                while (!subscriberActive.compareAndSet(false, true)) ;
                map.put(key, i);
                subscriberActive.compareAndSet(true, false);
                if (i % 10000 == 0) {
                    System.out.printf("Published %d messages.\n", i);
                }
            }
            System.out.printf("Published all %d messages\n", totalMessages);
        });

        var subscriberCount = 100;
        var subscribers = Executors.newFixedThreadPool(10);
        var subsLatch = new CountDownLatch(subscriberCount);

        IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
            try {
                while (!subscriberActive.compareAndSet(false, true)) ;
                var existingKeys = new HashSet<>(map.keySet());
                subscriberActive.compareAndSet(true, false);

                var size = existingKeys.size();

                //Note the keys are inserted by publisher in sequential order.
                // Hence, existing keys values should have all keys from range 0 to size-1
                // This is where weak consistency shows up.
                var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
                        .sorted()
                        .mapToObj(i -> Integer.valueOf(i))
                        .collect(Collectors.toList());

                if (!missingKeys.isEmpty()) {
                    var sortedExistingKeys = existingKeys.stream()
                            .sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
                            .toList();
                    var start = sortedExistingKeys.get(0);
                    var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
                    var missingStart = missingKeys.get(0);
                    var missingEnd = missingKeys.get(missingKeys.size() - 1);
                    throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
                }
            } catch (Exception ex) {
                ex.printStackTrace();
                System.exit(-1);
            } finally {
                subsLatch.countDown();
                System.out.printf("Subscriber %d finished reading from map.\n", subscriberId);
            }
        }));


        subsLatch.await();
        publisher.shutdownNow();
        subscribers.shutdownNow();
    }
}

It appears that in ConcurrenHashMap, when concurrent writes and full reads of the map are done, then the read returns results which are often missing keys from the past. I was expecting that it would provide a snapshot of the map at a point of time. However that does not seem to be the result based on the above test code.

like image 782
telu Avatar asked Dec 31 '25 06:12

telu


1 Answers

Not transactional

ConcurrentHashMap is not a transactional database.

Update status is per key according to the documentatïon:

Retrieval operations (including get) generally do not block, so may overlap with update operations (including put and remove). Retrievals reflect the results of the most recently completed update operations holding upon their onset. (More formally, an update operation for a given key bears a happens-before relation with any (non-null) retrieval for that key reporting the updated value.) For aggregate operations such as putAll and clear, concurrent retrievals may reflect insertion or removal of only some entries. Similarly, Iterators, Spliterators and Enumerations return elements reflecting the state of the hash table at some point at or since the creation of the iterator/enumeration. They do not throw ConcurrentModificationException. However, iterators are designed to be used by only one thread at a time. Bear in mind that the results of aggregate status methods including size, isEmpty, and containsValue are typically useful only when a map is not undergoing concurrent updates in other threads. Otherwise the results of these methods reflect transient states that may be adequate for monitoring or estimation purposes, but not for program control

like image 171
aled Avatar answered Jan 05 '26 11:01

aled



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!