Fix races in IAM cache lazy loading (#19346)
Fix races in IAM cache Fixes #19344 On the top level we only grab a read lock, but we write to the cache if we manage to fetch it. https://github.com/minio/minio/blob/a03dac41ebd2c1b9d3f1110ef5d299ffebb8f87b/cmd/iam-store.go#L446 is also flipped to what it should be AFAICT. Change the internal cache structure to a concurrency safe implementation. Bonus: Also switch grid implementation.
This commit is contained in:
@@ -42,6 +42,7 @@ import (
|
||||
xioutil "github.com/minio/minio/internal/ioutil"
|
||||
"github.com/minio/minio/internal/logger"
|
||||
"github.com/minio/minio/internal/pubsub"
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
"github.com/tinylib/msgp/msgp"
|
||||
"github.com/zeebo/xxh3"
|
||||
)
|
||||
@@ -75,10 +76,10 @@ type Connection struct {
|
||||
ctx context.Context
|
||||
|
||||
// Active mux connections.
|
||||
outgoing *lockedClientMap
|
||||
outgoing *xsync.MapOf[uint64, *muxClient]
|
||||
|
||||
// Incoming streams
|
||||
inStream *lockedServerMap
|
||||
inStream *xsync.MapOf[uint64, *muxServer]
|
||||
|
||||
// outQueue is the output queue
|
||||
outQueue chan []byte
|
||||
@@ -205,8 +206,8 @@ func newConnection(o connectionParams) *Connection {
|
||||
Local: o.local,
|
||||
id: o.id,
|
||||
ctx: o.ctx,
|
||||
outgoing: &lockedClientMap{m: make(map[uint64]*muxClient, 1000)},
|
||||
inStream: &lockedServerMap{m: make(map[uint64]*muxServer, 1000)},
|
||||
outgoing: xsync.NewMapOfPresized[uint64, *muxClient](1000),
|
||||
inStream: xsync.NewMapOfPresized[uint64, *muxServer](1000),
|
||||
outQueue: make(chan []byte, defaultOutQueue),
|
||||
dialer: o.dial,
|
||||
side: ws.StateServerSide,
|
||||
|
||||
@@ -146,142 +146,3 @@ func bytesOrLength(b []byte) string {
|
||||
}
|
||||
return fmt.Sprint(b)
|
||||
}
|
||||
|
||||
type lockedClientMap struct {
|
||||
m map[uint64]*muxClient
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (m *lockedClientMap) Load(id uint64) (*muxClient, bool) {
|
||||
m.mu.Lock()
|
||||
v, ok := m.m[id]
|
||||
m.mu.Unlock()
|
||||
return v, ok
|
||||
}
|
||||
|
||||
func (m *lockedClientMap) LoadAndDelete(id uint64) (*muxClient, bool) {
|
||||
m.mu.Lock()
|
||||
v, ok := m.m[id]
|
||||
if ok {
|
||||
delete(m.m, id)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
return v, ok
|
||||
}
|
||||
|
||||
func (m *lockedClientMap) Size() int {
|
||||
m.mu.Lock()
|
||||
v := len(m.m)
|
||||
m.mu.Unlock()
|
||||
return v
|
||||
}
|
||||
|
||||
func (m *lockedClientMap) Delete(id uint64) {
|
||||
m.mu.Lock()
|
||||
delete(m.m, id)
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *lockedClientMap) Range(fn func(key uint64, value *muxClient) bool) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
for k, v := range m.m {
|
||||
if !fn(k, v) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *lockedClientMap) Clear() {
|
||||
m.mu.Lock()
|
||||
m.m = map[uint64]*muxClient{}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *lockedClientMap) LoadOrStore(id uint64, v *muxClient) (*muxClient, bool) {
|
||||
m.mu.Lock()
|
||||
v2, ok := m.m[id]
|
||||
if ok {
|
||||
m.mu.Unlock()
|
||||
return v2, true
|
||||
}
|
||||
m.m[id] = v
|
||||
m.mu.Unlock()
|
||||
return v, false
|
||||
}
|
||||
|
||||
type lockedServerMap struct {
|
||||
m map[uint64]*muxServer
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (m *lockedServerMap) Load(id uint64) (*muxServer, bool) {
|
||||
m.mu.Lock()
|
||||
v, ok := m.m[id]
|
||||
m.mu.Unlock()
|
||||
return v, ok
|
||||
}
|
||||
|
||||
func (m *lockedServerMap) LoadAndDelete(id uint64) (*muxServer, bool) {
|
||||
m.mu.Lock()
|
||||
v, ok := m.m[id]
|
||||
if ok {
|
||||
delete(m.m, id)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
return v, ok
|
||||
}
|
||||
|
||||
func (m *lockedServerMap) Size() int {
|
||||
m.mu.Lock()
|
||||
v := len(m.m)
|
||||
m.mu.Unlock()
|
||||
return v
|
||||
}
|
||||
|
||||
func (m *lockedServerMap) Delete(id uint64) {
|
||||
m.mu.Lock()
|
||||
delete(m.m, id)
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *lockedServerMap) Range(fn func(key uint64, value *muxServer) bool) {
|
||||
m.mu.Lock()
|
||||
for k, v := range m.m {
|
||||
if !fn(k, v) {
|
||||
break
|
||||
}
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *lockedServerMap) Clear() {
|
||||
m.mu.Lock()
|
||||
m.m = map[uint64]*muxServer{}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
func (m *lockedServerMap) LoadOrStore(id uint64, v *muxServer) (*muxServer, bool) {
|
||||
m.mu.Lock()
|
||||
v2, ok := m.m[id]
|
||||
if ok {
|
||||
m.mu.Unlock()
|
||||
return v2, true
|
||||
}
|
||||
m.m[id] = v
|
||||
m.mu.Unlock()
|
||||
return v, false
|
||||
}
|
||||
|
||||
func (m *lockedServerMap) LoadOrCompute(id uint64, fn func() *muxServer) (*muxServer, bool) {
|
||||
m.mu.Lock()
|
||||
v2, ok := m.m[id]
|
||||
if ok {
|
||||
m.mu.Unlock()
|
||||
return v2, true
|
||||
}
|
||||
v := fn()
|
||||
m.m[id] = v
|
||||
m.mu.Unlock()
|
||||
return v, false
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user