What is the best way to implement global counters for a highly concurrent application? In my case I may have 10K-20K go routines performing "work", and I want to count the number and types of items that the routines are working on collectively...
The "classic" synchronous coding style would look like:
var work_counter int
func GoWorkerRoutine() {
for {
// do work
atomic.AddInt32(&work_counter,1)
}
}
Now this gets more complicated because I want to track the "type" of work being done, so really I'd need something like this:
var work_counter map[string]int
var work_mux sync.Mutex
func GoWorkerRoutine() {
for {
// do work
work_mux.Lock()
work_counter["type1"]++
work_mux.Unlock()
}
}
It seems like there should be a "go" optimized way using channels or something similar to this:
var work_counter int
var work_chan chan int // make() called somewhere else (buffered)
// started somewher else
func GoCounterRoutine() {
for {
select {
case c := <- work_chan:
work_counter += c
break
}
}
}
func GoWorkerRoutine() {
for {
// do work
work_chan <- 1
}
}
This last example is still missing the map, but that's easy enough to add. Will this style provide better performance than just a simple atomic increment? I can't tell if this is more or less complicated when we're talking about concurrent access to a global value versus something that may block on I/O to complete...
Thoughts are appreciated.
Update 5/28/2013:
I tested a couple implementations, and the results were not what I expected, here's my counter source code:
package helpers
import (
)
type CounterIncrementStruct struct {
bucket string
value int
}
type CounterQueryStruct struct {
bucket string
channel chan int
}
var counter map[string]int
var counterIncrementChan chan CounterIncrementStruct
var counterQueryChan chan CounterQueryStruct
var counterListChan chan chan map[string]int
func CounterInitialize() {
counter = make(map[string]int)
counterIncrementChan = make(chan CounterIncrementStruct,0)
counterQueryChan = make(chan CounterQueryStruct,100)
counterListChan = make(chan chan map[string]int,100)
go goCounterWriter()
}
func goCounterWriter() {
for {
select {
case ci := <- counterIncrementChan:
if len(ci.bucket)==0 { return }
counter[ci.bucket]+=ci.value
break
case cq := <- counterQueryChan:
val,found:=counter[cq.bucket]
if found {
cq.channel <- val
} else {
cq.channel <- -1
}
break
case cl := <- counterListChan:
nm := make(map[string]int)
for k, v := range counter {
nm[k] = v
}
cl <- nm
break
}
}
}
func CounterIncrement(bucket string, counter int) {
if len(bucket)==0 || counter==0 { return }
counterIncrementChan <- CounterIncrementStruct{bucket,counter}
}
func CounterQuery(bucket string) int {
if len(bucket)==0 { return -1 }
reply := make(chan int)
counterQueryChan <- CounterQueryStruct{bucket,reply}
return <- reply
}
func CounterList() map[string]int {
reply := make(chan map[string]int)
counterListChan <- reply
return <- reply
}
It uses channels for both writes and reads which seems logical.
Here are my test cases:
func bcRoutine(b *testing.B,e chan bool) {
for i := 0; i < b.N; i++ {
CounterIncrement("abc123",5)
CounterIncrement("def456",5)
CounterIncrement("ghi789",5)
CounterIncrement("abc123",5)
CounterIncrement("def456",5)
CounterIncrement("ghi789",5)
}
e<-true
}
func BenchmarkChannels(b *testing.B) {
b.StopTimer()
CounterInitialize()
e:=make(chan bool)
b.StartTimer()
go bcRoutine(b,e)
go bcRoutine(b,e)
go bcRoutine(b,e)
go bcRoutine(b,e)
go bcRoutine(b,e)
<-e
<-e
<-e
<-e
<-e
}
var mux sync.Mutex
var m map[string]int
func bmIncrement(bucket string, value int) {
mux.Lock()
m[bucket]+=value
mux.Unlock()
}
func bmRoutine(b *testing.B,e chan bool) {
for i := 0; i < b.N; i++ {
bmIncrement("abc123",5)
bmIncrement("def456",5)
bmIncrement("ghi789",5)
bmIncrement("abc123",5)
bmIncrement("def456",5)
bmIncrement("ghi789",5)
}
e<-true
}
func BenchmarkMutex(b *testing.B) {
b.StopTimer()
m=make(map[string]int)
e:=make(chan bool)
b.StartTimer()
for i := 0; i < b.N; i++ {
bmIncrement("abc123",5)
bmIncrement("def456",5)
bmIncrement("ghi789",5)
bmIncrement("abc123",5)
bmIncrement("def456",5)
bmIncrement("ghi789",5)
}
go bmRoutine(b,e)
go bmRoutine(b,e)
go bmRoutine(b,e)
go bmRoutine(b,e)
go bmRoutine(b,e)
<-e
<-e
<-e
<-e
<-e
}
I implemented a simple benchmark with just a mutex around the map (just testing writes), and benchmarked both with 5 goroutines running in parallel. Here are the results:
$ go test --bench=. helpers
PASS
BenchmarkChannels 100000 15560 ns/op
BenchmarkMutex 1000000 2669 ns/op
ok helpers 4.452s
I would not have expected the mutex to be that much faster...
Further thoughts?
If you're trying to synchronize a pool of workers (e.g. allow n goroutines to crunch away at some amount of work) then channels are a very good way to go about it, but if all you actually need is a counter (e.g page views) then they are overkill. The sync and sync/atomic packages are there to help.
import "sync/atomic"
type count32 int32
func (c *count32) inc() int32 {
return atomic.AddInt32((*int32)(c), 1)
}
func (c *count32) get() int32 {
return atomic.LoadInt32((*int32)(c))
}
Go Playground Example
Don't use sync/atomic - from the linked page
Package atomic provides low-level atomic memory primitives useful for implementing synchronization algorithms. These functions require great care to be used correctly. Except for special, low-level applications, synchronization is better done with channels or the facilities of the sync package
Last time I had to do this I benchmarked something which looked like your second example with a mutex and something which looked like your third example with a channel. The channels code won when things got really busy, but make sure you make the channel buffer big.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With