I would like to implement the following logic:
-the following structure is to be used
//Map<String, CopyOnWriteArrayList> keeping the pending updates
//grouped by the id of the updated object
final Map<String, List<Update>> updatesPerId = new ConcurrentHashMap<>();
-n producers will add updates to updatesPerId map (for the same id, 2 updates can be added at the same time)
-one TimerThread will run from time to time and has to process the received updates. Something like:
final Map<String, List<Update>> toBeProcessed = new HashMap<>(updatesPerId);
updatesPerId.clear();
// iterate over toBeProcessed and process them
Is there any way to make this logic thread safe without synchronizing the adding logic from producers and the logic from timerThread(consumer)? I am thinking about an atomic clear+get but it seems that ConcurrentMap does not provide something like that. Also, I have to mention that updates should be kept by updated object id so I cannot replace the map with a queue or something else.
Any ideas? Thanks!
You can leverage the fact that ConcurrentHashMap.compute
executes atomically.
You can put into the updatesPerId
like so:
updatesPerId.compute(k, (k, list) -> {
if (list == null) list = new ArrayList<>();
// ... add to the list
// Return a non-null list, so the key/value pair is stored in the map.
return list;
});
This is not using computeIfAbsent
then adding to the return value, which would not be atomic.
Then in your thread to remove things:
for (String key : updatesPerId.keySet()) {
List<Update> list = updatesPerId.put(key, null);
updatesPerId.compute(key, (k, list) -> {
// ... Process the contents of the list.
// Removes the key/value pair from the map.
return null;
});
}
So, adding a key to the list (or processing all the values for that key) might block if you so happen to try to process the key in both places at once; otherwise, it will not be blocked.
Edit: as pointed out by @StuartMarks, it might be better to simply get all things out of the map first, and then process them later, in order to avoid blocking other threads trying to add:
Map<String, List<Update>> newMap = new HashMap<>();
for (String key : updatesPerId.keySet()) {
newMap.put(key, updatesPerId.remove(key));
}
// ... Process entries in newMap.
I'd suggest using LinkedBlockingQueue
instead of CopyOnWriteArrayList
as the map value. With COWAL, adds get successively more expensive, so adding N elements results in N^2 performance. LBQ addition is O(1). Also, LBQ has drainTo
which can be used effectively here. You could do this:
final Map<String, Queue<Update>> updatesPerId = new ConcurrentHashMap<>();
Producer:
updatesPerId.computeIfAbsent(id, LinkedBlockingQueue::new).add(update);
Consumer:
updatesPerId.forEach((id, queue) -> {
List<Update> updates = new ArrayList<>();
queue.drainTo(updates);
processUpdates(id, updates);
});
This is somewhat different from what you had suggested. This technique processes the updates for each id, but lets producers continue to add updates to the map while this is going on. This leaves map entries and queues in the map for each id. If the ids end up getting reused a lot, the number of map entries will plateau at a high-water mark.
If new ids are continually coming in, and old ids becoming disused, the map will grow continually, which probably isn't what you want. If this is the case you could use the technique in Andy Turner's answer.
If the consumer really needs to snapshot and clear the entire map, I think you have to use locking, which you wanted to avoid.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With