When incrementing concurrent counters by key in ConcurrentHashMap is it safe to use regular Int for value or do we have to use AtomicInteger? For example consider the following two implementations
ConcurrentHashMap[String, Int]
final class ExpensiveMetrics(implicit system: ActorSystem, ec: ExecutionContext) {
import scala.collection.JavaConverters._
private val chm = new ConcurrentHashMap[String, Int]().asScala
system.scheduler.schedule(5.seconds, 60.seconds)(publishAllMetrics())
def countRequest(key: String): Unit =
chm.get(key) match {
case Some(value) => chm.update(key, value + 1)
case None => chm.update(key, 1)
}
private def resetCount(key: String) = chm.replace(key, 0)
private def publishAllMetrics(): Unit =
chm foreach { case (key, value) =>
// publishMetric(key, value.doubleValue())
resetCount(key)
}
}
ConcurrentHashMap[String, AtomicInteger]
final class ExpensiveMetrics(implicit system: ActorSystem, ec: ExecutionContext) {
import scala.collection.JavaConverters._
private val chm = new ConcurrentHashMap[String, AtomicInteger]().asScala
system.scheduler.schedule(5.seconds, 60.seconds)(publishAllMetrics())
def countRequest(key: String): Unit =
chm.getOrElseUpdate(key, new AtomicInteger(1)).incrementAndGet()
private def resetCount(key: String): Unit =
chm.getOrElseUpdate(key, new AtomicInteger(0)).set(0)
private def publishAllMetrics(): Unit =
chm foreach { case (key, value) =>
// publishMetric(key, value.doubleValue())
resetCount(key)
}
}
Is the former implementation safe? If not, at what point in the snippet can race-condition be introduced and why?
The context of the question are AWS CloudWatch metrics which can get very expensive on high-frequency APIs if posted on each request. So I am trying to "batch" them up and publish them periodically.
The first implementation is not correct, because the countRequest method is not atomic. Consider this sequence of events:
countRequest with key "foo"The counter should be x+2, but it is x+1. It's a classic lost update problem.
To implement this correctly, use the compute method on ConcurrentHashMap.
This method will execute atomically, so you won't need an AtomicInteger.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With