From 0fae8067aee4a292923563afeca76e59a9b1484c Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 8 Mar 2022 17:52:56 -0700 Subject: [PATCH] [FIXED] Some lock inversions The established ordering is client -> Account, so fixed few places where we had Account -> client. Added a new file, locksordering.txt with the list of known ordering for some of the objects. Signed-off-by: Ivan Kozlovic --- locksordering.txt | 5 ++ server/accounts.go | 94 +++++++++++++++++++++--------------- server/jetstream_test.go | 4 +- server/monitor_test.go | 1 - test/service_latency_test.go | 2 - 5 files changed, 63 insertions(+), 43 deletions(-) create mode 100644 locksordering.txt diff --git a/locksordering.txt b/locksordering.txt new file mode 100644 index 0000000000..fec4b25097 --- /dev/null +++ b/locksordering.txt @@ -0,0 +1,5 @@ +Here is the list of some established lock ordering. + +In this list, A -> B means that you can have A.Lock() then B.Lock(), not the opposite. + +jetStream -> jsAccount -> Server -> client-> Account diff --git a/server/accounts.go b/server/accounts.go index 38b20270a1..36de78d234 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -292,6 +292,27 @@ func (a *Account) nextEventID() string { return id } +// Returns a slice of clients stored in the account, or nil if none is present. +// Lock is held on entry. +func (a *Account) getClientsLocked() []*client { + if len(a.clients) == 0 { + return nil + } + clients := make([]*client, 0, len(a.clients)) + for c := range a.clients { + clients = append(clients, c) + } + return clients +} + +// Returns a slice of clients stored in the account, or nil if none is present. +func (a *Account) getClients() []*client { + a.mu.RLock() + clients := a.getClientsLocked() + a.mu.RUnlock() + return clients +} + // Called to track a remote server and connections and leafnodes it // has for this account. func (a *Account) updateRemoteServer(m *AccountNumConns) []*client { @@ -312,10 +333,7 @@ func (a *Account) updateRemoteServer(m *AccountNumConns) []*client { // conservative and bit harsh here. Clients will reconnect if we over compensate. var clients []*client if mtce { - clients = make([]*client, 0, len(a.clients)) - for c := range a.clients { - clients = append(clients, c) - } + clients := a.getClientsLocked() sort.Slice(clients, func(i, j int) bool { return clients[i].start.After(clients[j].start) }) @@ -670,9 +688,13 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error { // If we have connected leafnodes make sure to update. if len(a.lleafs) > 0 { - for _, lc := range a.lleafs { + leafs := append([]*client(nil), a.lleafs...) + // Need to release because lock ordering is client -> account + a.mu.Unlock() + for _, lc := range leafs { lc.forceAddToSmap(src) } + a.mu.Lock() } return nil } @@ -963,8 +985,6 @@ func (a *Account) addServiceExportWithResponseAndAccountPos( } a.mu.Lock() - defer a.mu.Unlock() - if a.exports.services == nil { a.exports.services = make(map[string]*serviceExport) } @@ -981,6 +1001,7 @@ func (a *Account) addServiceExportWithResponseAndAccountPos( if accounts != nil || accountPos > 0 { if err := setExportAuth(&se.exportAuth, subject, accounts, accountPos); err != nil { + a.mu.Unlock() return err } } @@ -988,8 +1009,16 @@ func (a *Account) addServiceExportWithResponseAndAccountPos( se.acc = a se.respThresh = DEFAULT_SERVICE_EXPORT_RESPONSE_THRESHOLD a.exports.services[subject] = se - if nlrt := a.lowestServiceExportResponseTime(); nlrt != lrt { - a.updateAllClientsServiceExportResponseTime(nlrt) + + var clients []*client + nlrt := a.lowestServiceExportResponseTime() + if nlrt != lrt && len(a.clients) > 0 { + clients = a.getClientsLocked() + } + // Need to release because lock ordering is client -> Account + a.mu.Unlock() + if len(clients) > 0 { + updateAllClientsServiceExportResponseTime(clients, nlrt) } return nil } @@ -1353,9 +1382,8 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool // This will check to make sure our response lower threshold is set // properly in any clients doing rrTracking. -// Lock should be held. -func (a *Account) updateAllClientsServiceExportResponseTime(lrt time.Duration) { - for c := range a.clients { +func updateAllClientsServiceExportResponseTime(clients []*client, lrt time.Duration) { + for _, c := range clients { c.mu.Lock() if c.rrTracking != nil && lrt != c.rrTracking.lrt { c.rrTracking.lrt = lrt @@ -2234,18 +2262,27 @@ func (a *Account) ServiceExportResponseThreshold(export string) (time.Duration, // from a service export responder. func (a *Account) SetServiceExportResponseThreshold(export string, maxTime time.Duration) error { a.mu.Lock() - defer a.mu.Unlock() if a.isClaimAccount() { + a.mu.Unlock() return fmt.Errorf("claim based accounts can not be updated directly") } lrt := a.lowestServiceExportResponseTime() se := a.getServiceExport(export) if se == nil { + a.mu.Unlock() return fmt.Errorf("no export defined for %q", export) } se.respThresh = maxTime - if nlrt := a.lowestServiceExportResponseTime(); nlrt != lrt { - a.updateAllClientsServiceExportResponseTime(nlrt) + + var clients []*client + nlrt := a.lowestServiceExportResponseTime() + if nlrt != lrt && len(a.clients) > 0 { + clients = a.getClientsLocked() + } + // Need to release because lock ordering is client -> Account + a.mu.Unlock() + if len(clients) > 0 { + updateAllClientsServiceExportResponseTime(clients, nlrt) } return nil } @@ -2569,10 +2606,7 @@ func (a *Account) streamActivationExpired(exportAcc *Account, subject string) { a.mu.Lock() si.invalid = true - clients := make([]*client, 0, len(a.clients)) - for c := range a.clients { - clients = append(clients, c) - } + clients := a.getClientsLocked() awcsti := map[string]struct{}{a.Name: {}} a.mu.Unlock() for _, c := range clients { @@ -2779,13 +2813,7 @@ func (a *Account) expiredTimeout() { a.mu.Unlock() // Collect the clients and expire them. - cs := make([]*client, 0, len(a.clients)) - a.mu.RLock() - for c := range a.clients { - cs = append(cs, c) - } - a.mu.RUnlock() - + cs := a.getClients() for _, c := range cs { c.accountAuthExpired() } @@ -3001,16 +3029,6 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim s.registerSystemImports(a) } - gatherClients := func() []*client { - a.mu.RLock() - clients := make([]*client, 0, len(a.clients)) - for c := range a.clients { - clients = append(clients, c) - } - a.mu.RUnlock() - return clients - } - jsEnabled := s.JetStreamEnabled() if jsEnabled && a == s.SystemAccount() { s.checkJetStreamExports() @@ -3144,7 +3162,7 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim // Now let's apply any needed changes from import/export changes. if !a.checkStreamImportsEqual(old) { awcsti := map[string]struct{}{a.Name: {}} - for _, c := range gatherClients() { + for _, c := range a.getClients() { c.processSubsOnConfigReload(awcsti) } } @@ -3266,9 +3284,9 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim } a.updated = time.Now().UTC() + clients := a.getClientsLocked() a.mu.Unlock() - clients := gatherClients() // Sort if we are over the limit. if a.MaxTotalConnectionsReached() { sort.Slice(clients, func(i, j int) bool { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index c22cce1005..e715241633 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -4524,7 +4524,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { if err != nil { t.Fatalf("Expected to find a stream for %q", mname) } - state = mset.state() + mset.state() mset.delete() rreq.Config.Name = "NEW_STREAM" @@ -15195,7 +15195,7 @@ func TestJetStreamPullConsumerHeartBeats(t *testing.T) { } }() - start, msgs = time.Now(), doReq(10, 75*time.Millisecond, 350*time.Millisecond, 6) + msgs = doReq(10, 75*time.Millisecond, 350*time.Millisecond, 6) // The first 5 should be msgs, no HBs. for i := 0; i < 5; i++ { if m := msgs[i].msg; len(m.Header) > 0 { diff --git a/server/monitor_test.go b/server/monitor_test.go index 446b0a3c54..77976a5816 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -677,7 +677,6 @@ func TestConnzLastActivity(t *testing.T) { if barLA.Equal(nextLA) { t.Fatalf("Publish should have triggered update to LastActivity\n") } - barLA = nextLA // Message delivery on ncFoo should have triggered as well. nextLA = ciFoo.LastActivity diff --git a/test/service_latency_test.go b/test/service_latency_test.go index 5f47db8222..abb1b6710e 100644 --- a/test/service_latency_test.go +++ b/test/service_latency_test.go @@ -361,7 +361,6 @@ func TestServiceLatencyClientRTTSlowerVsServiceRTT(t *testing.T) { } // Send the request. - start = time.Now() _, err := nc2.Request("ngs.usage", []byte("1h"), time.Second) if err != nil { t.Fatalf("Expected a response") @@ -1500,7 +1499,6 @@ func TestServiceLatencyRequestorSharesConfig(t *testing.T) { t.Fatalf("Error on server reload: %v", err) } - start = time.Now() if _, err = nc2.Request("SVC", []byte("1h"), time.Second); err != nil { t.Fatalf("Expected a response") }