Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ConcurrentHashMap[String, AtomicInteger] or ConcurrentHashMap[String, Int] for thread-safe counters?

When incrementing concurrent counters by key in ConcurrentHashMap is it safe to use regular Int for value or do we have to use AtomicInteger? For example consider the following two implementations

ConcurrentHashMap[String, Int]

final class ExpensiveMetrics(implicit system: ActorSystem, ec: ExecutionContext) {
  import scala.collection.JavaConverters._
  private val chm = new ConcurrentHashMap[String, Int]().asScala

  system.scheduler.schedule(5.seconds, 60.seconds)(publishAllMetrics())

  def countRequest(key: String): Unit =
    chm.get(key) match {
      case Some(value) => chm.update(key, value + 1)
      case None => chm.update(key, 1)
    }

  private def resetCount(key: String) = chm.replace(key, 0)

  private def publishAllMetrics(): Unit =
    chm foreach { case (key, value) =>
      // publishMetric(key, value.doubleValue())
      resetCount(key)
    }
}

ConcurrentHashMap[String, AtomicInteger]

final class ExpensiveMetrics(implicit system: ActorSystem, ec: ExecutionContext) {
  import scala.collection.JavaConverters._
  private val chm = new ConcurrentHashMap[String, AtomicInteger]().asScala

  system.scheduler.schedule(5.seconds, 60.seconds)(publishAllMetrics())

  def countRequest(key: String): Unit =
    chm.getOrElseUpdate(key, new AtomicInteger(1)).incrementAndGet()
  
  private def resetCount(key: String): Unit =
    chm.getOrElseUpdate(key, new AtomicInteger(0)).set(0)

  private def publishAllMetrics(): Unit =
    chm foreach { case (key, value) =>
      // publishMetric(key, value.doubleValue())
      resetCount(key)
    }
}

Is the former implementation safe? If not, at what point in the snippet can race-condition be introduced and why?


The context of the question are AWS CloudWatch metrics which can get very expensive on high-frequency APIs if posted on each request. So I am trying to "batch" them up and publish them periodically.

like image 913
Mario Galic Avatar asked Oct 23 '25 02:10

Mario Galic


1 Answers

The first implementation is not correct, because the countRequest method is not atomic. Consider this sequence of events:

  • Threads A and B both call countRequest with key "foo"
  • Thread A obtains the counter value, let's call it x
  • Thread B obtains the counter value. It's the same value x, because Thread A hasn't updated the counter yet.
  • Thread B updates the map with the new counter value, x+1
  • Thread A updates the map, and because it obtained the counter value before B wrote the new counter value, it also writes x+1.

The counter should be x+2, but it is x+1. It's a classic lost update problem.

The second implementation has a similar problem due to the use of the `getOrElseUpdate` method. `ConcurrentHashMap` does not have that method, therefore the Scala wrapper needs to emulate it. I think the implementation is the one inherited from `scala.collection.mutable.MapOps`, and it is defined like so: ``` def getOrElseUpdate(key: K, op: => V): V = get(key) match { case Some(v) => v case None => val d = op; this(key) = d; d } ``` This is obviously not atomic.

To implement this correctly, use the compute method on ConcurrentHashMap.

This method will execute atomically, so you won't need an AtomicInteger.

like image 114
Matthias Berndt Avatar answered Oct 25 '25 17:10

Matthias Berndt



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!