Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ConcurrentHashMap wait for key possible?

i have multithread communication. 1 Thread is dispatching datas to other threads.

Main thread is pushing data:

Main Thread: ConcurrentHashMap map = Global.getInstance().getMap(); //push data to some other thread map.put(1,"Test");

Thread 1: String data = map.get(1); //returns null directly , but i want to wait until data pushed

Thread 1 returns null if main thread doesn't push any data. But i want to wait until i got data , how can i wait ?

TransferQueue is not good solution to my current implementation. I have to do with ConcurrentHashMap.

Does someone know any solution?

like image 409
Kadir BASOL Avatar asked Feb 13 '23 21:02

Kadir BASOL


2 Answers

You can create a BlockingMap, something like this; depending on usage, you should also device a mechanism to remove unused keys and queues associated to them in order to avoid a memory leak.

public class BlockingMap<K, V> {
    private final Map<K, BlockingQueue<V>> map = new ConcurrentHashMap<>();

    private synchronized BlockingQueue<V> ensureQueueExists(K key) {
        //concurrentMap.putIfAbsent would require creating a new
        //blocking queue each time put or get is called
        if (map.containsKey(key)) {
            return map.get(key);
        } else {
            BlockingQueue<V> queue = new ArrayBlockingQueue<>(1);
            map.put(key, queue);
            return queue;
        }
    }

    public boolean put(K key, V value, long timeout, TimeUnit timeUnit) {
        BlockingQueue<V> queue = ensureQueueExists(key);
        try {
            return queue.offer(value, timeout, timeUnit);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }

    public V get(K key, long timeout, TimeUnit timeUnit) {
        BlockingQueue<V> queue = ensureQueueExists(key);
        try {
            return queue.poll(timeout, timeUnit);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }
}

Since Java 8, ensureQueueExists can be written:

private synchronized BlockingQueue<V> ensureQueueExists(K key) {
    return map.computeIfAbsent(key, k -> new ArrayBlockingQueue<>(1));
}
like image 75
Random42 Avatar answered Feb 15 '23 10:02

Random42


You can use BlockingQueue if you allowed to change your implementation, or use a wait/notify technique to wait and run when occupy.

  String value = null;
    while(true) {
        if((value = map.get(1)) != null) { // VARY IMPORTANT to use get and !=
            // work with value 
        } else {
            synchronized (map) {
                try {
                     map.wait();
                 } catch (InterruptedException ie) {}
            }
        }
    }

and for producer:

    String value = "Test";
    map.put(1,value);
    synchronized (map) {
       map.notifyAll(); // or notify().
    }
like image 27
mohamed sulibi Avatar answered Feb 15 '23 09:02

mohamed sulibi