I am making a cache wrapper around a database. To account for possibly slow database calls, I was thinking of a mutex per key (pseudo Go code):
mutexes = map[string]*sync.Mutex // instance variable
mutexes[key].Lock()
defer mutexes[key].Unlock()
if value, ok := cache.find(key); ok {
return value
}
value = databaseCall(key)
cache.save(key, value)
return value
However I don't want my map to grow too much. My cache is an LRU and I want to have a fixed size for some other reasons not mentioned here. I would like to do something like
delete(mutexes, key)
when all the locks on the key are over but... that doesn't look thread safe to me... How should I do it?
Note: I found this question In Go, can we synchronize each key of a map using a lock per key? but no answer
A Mutex is used to provide a locking mechanism to ensure that only one Goroutine is running the critical section of code at any point in time to prevent race conditions from happening. Mutex is available in the sync package. There are two methods defined on Mutex namely Lock and Unlock.
Mutexes are data structures provided by the sync package. They can help us place a lock on different sections of data so that only one goroutine can access it at a time.
RWMutex – Safely Allow Multiple Readers A read/write mutex allows all the readers to access the map at the same time, but a writer will lock out everyone else.
A map of mutexes is an efficient way to accomplish this, however the map itself must also be synchronized. A reference count can be used to keep track of entries in concurrent use and remove them when no longer needed. Here is a working map of mutexes complete with a test and benchmark.
(UPDATE: This package provides similar functionality: https://pkg.go.dev/golang.org/x/sync/singleflight )
mapofmu.go
// Package mapofmu provides locking per-key.
// For example, you can acquire a lock for a specific user ID and all other requests for that user ID
// will block until that entry is unlocked (effectively your work load will be run serially per-user ID),
// and yet have work for separate user IDs happen concurrently.
package mapofmu
import (
"fmt"
"sync"
)
// M wraps a map of mutexes. Each key locks separately.
type M struct {
ml sync.Mutex // lock for entry map
ma map[interface{}]*mentry // entry map
}
type mentry struct {
m *M // point back to M, so we can synchronize removing this mentry when cnt==0
el sync.Mutex // entry-specific lock
cnt int // reference count
key interface{} // key in ma
}
// Unlocker provides an Unlock method to release the lock.
type Unlocker interface {
Unlock()
}
// New returns an initalized M.
func New() *M {
return &M{ma: make(map[interface{}]*mentry)}
}
// Lock acquires a lock corresponding to this key.
// This method will never return nil and Unlock() must be called
// to release the lock when done.
func (m *M) Lock(key interface{}) Unlocker {
// read or create entry for this key atomically
m.ml.Lock()
e, ok := m.ma[key]
if !ok {
e = &mentry{m: m, key: key}
m.ma[key] = e
}
e.cnt++ // ref count
m.ml.Unlock()
// acquire lock, will block here until e.cnt==1
e.el.Lock()
return e
}
// Unlock releases the lock for this entry.
func (me *mentry) Unlock() {
m := me.m
// decrement and if needed remove entry atomically
m.ml.Lock()
e, ok := m.ma[me.key]
if !ok { // entry must exist
m.ml.Unlock()
panic(fmt.Errorf("Unlock requested for key=%v but no entry found", me.key))
}
e.cnt-- // ref count
if e.cnt < 1 { // if it hits zero then we own it and remove from map
delete(m.ma, me.key)
}
m.ml.Unlock()
// now that map stuff is handled, we unlock and let
// anything else waiting on this key through
e.el.Unlock()
}
mapofmu_test.go:
package mapofmu
import (
"math/rand"
"strconv"
"strings"
"sync"
"testing"
"time"
)
func TestM(t *testing.T) {
r := rand.New(rand.NewSource(42))
m := New()
_ = m
keyCount := 20
iCount := 10000
out := make(chan string, iCount*2)
// run a bunch of concurrent requests for various keys,
// the idea is to have a lot of lock contention
var wg sync.WaitGroup
wg.Add(iCount)
for i := 0; i < iCount; i++ {
go func(rn int) {
defer wg.Done()
key := strconv.Itoa(rn)
// you can prove the test works by commenting the locking out and seeing it fail
l := m.Lock(key)
defer l.Unlock()
out <- key + " A"
time.Sleep(time.Microsecond) // make 'em wait a mo'
out <- key + " B"
}(r.Intn(keyCount))
}
wg.Wait()
close(out)
// verify the map is empty now
if l := len(m.ma); l != 0 {
t.Errorf("unexpected map length at test end: %v", l)
}
// confirm that the output always produced the correct sequence
outLists := make([][]string, keyCount)
for s := range out {
sParts := strings.Fields(s)
kn, err := strconv.Atoi(sParts[0])
if err != nil {
t.Fatal(err)
}
outLists[kn] = append(outLists[kn], sParts[1])
}
for kn := 0; kn < keyCount; kn++ {
l := outLists[kn] // list of output for this particular key
for i := 0; i < len(l); i += 2 {
if l[i] != "A" || l[i+1] != "B" {
t.Errorf("For key=%v and i=%v got unexpected values %v and %v", kn, i, l[i], l[i+1])
break
}
}
}
if t.Failed() {
t.Logf("Failed, outLists: %#v", outLists)
}
}
func BenchmarkM(b *testing.B) {
m := New()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// run uncontended lock/unlock - should be quite fast
m.Lock(i).Unlock()
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With