Skip to content

Commit

Permalink
Move to one message per server
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Hanel <mh@synadia.com>
  • Loading branch information
matthiashanel committed Jul 11, 2022
1 parent c6e3562 commit 1f345b8
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 86 deletions.
111 changes: 35 additions & 76 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,10 +870,6 @@ func (s *Server) initEventTracking() {
optz := &AccountzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Accountz(&optz.AccountzOptions) })
},
"ACC_STATZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &AccountStatzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.AccountStatz(&optz.AccountStatzOptions) })
},
"JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &JszEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Jsz(&optz.JSzOptions) })
Expand All @@ -893,18 +889,26 @@ func (s *Server) initEventTracking() {
s.Errorf("Error setting up internal tracking: %v", err)
}
}
extractAccount := func(subject string) (string, error) {
extractAccount := func(c *client, subject string, msg []byte) (string, error) {
if tk := strings.Split(subject, tsep); len(tk) != accReqTokens {
return _EMPTY_, fmt.Errorf("subject %q is malformed", subject)
} else {
acc := tk[accReqAccIndex]
if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil && ci.Account != _EMPTY_ {
// Make sure the accounts match.
if ci.Account != acc {
// Do not leak too much here.
return _EMPTY_, fmt.Errorf("bad request")
}
}
return tk[accReqAccIndex], nil
}
}
monAccSrvc := map[string]msgHandler{
"SUBSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &SubszEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
optz.SubszOptions.Subscriptions = true
Expand All @@ -916,17 +920,9 @@ func (s *Server) initEventTracking() {
"CONNZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &ConnzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil && ci.Account != _EMPTY_ {
// Make sure the accounts match.
if ci.Account != acc {
// Do not leak too much here.
return nil, fmt.Errorf("bad request")
}
optz.ConnzOptions.isAccountReq = true
}
optz.ConnzOptions.Account = acc
return s.Connz(&optz.ConnzOptions)
}
Expand All @@ -935,7 +931,7 @@ func (s *Server) initEventTracking() {
"LEAFZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &LeafzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
optz.LeafzOptions.Account = acc
Expand All @@ -946,7 +942,7 @@ func (s *Server) initEventTracking() {
"JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &JszEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
optz.Account = acc
Expand All @@ -957,7 +953,7 @@ func (s *Server) initEventTracking() {
"INFO": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &AccInfoEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
return s.accountInfo(acc)
Expand All @@ -968,18 +964,20 @@ func (s *Server) initEventTracking() {
// For historical reasons CONNS is the odd one out.
// STATZ is also less heavy weight than INFO
"STATZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &EventFilterOptions{}
s.zReq(c, reply, msg, optz, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
optz := &AccountStatzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else if acc == "PING" { // Filter ping subject, this happens for server too, but filter is less explicit
return nil, errSkipZreq
} else {
if a, ok := s.accounts.Load(acc); !ok {
optz.Accounts = []string{acc}
if stz, err := s.AccountStatz(&optz.AccountStatzOptions); err != nil {
return nil, err
} else if len(stz.Accounts) == 0 && !optz.IncludeUnused {
return nil, errSkipZreq
} else {
a := a.(*Account)
a.mu.Lock()
defer a.mu.Unlock()
return a.accConns(), nil
return stz, nil
}
}
})
Expand All @@ -992,59 +990,20 @@ func (s *Server) initEventTracking() {
}
}

// For now only the CONNS subject has an account specific ping equivalent.
// TODO (mh) turn this into a zReq style function once a second one is added
// Other account specific ones are special cases of server specific serverStatsPingReqSubj
// and thus don't need to be exposed again
// This implementation also differs from the server counterparts as follows:
// there: one server one message.
// here: one server responds with one message per (requested) account
// For now only the STATZ subject has an account specific ping equivalent.
if _, err := s.sysSubscribe(fmt.Sprintf(accPingReqSubj, "STATZ"),
func(sub *subscription, c *client, acc *Account, subject, reply string, rmsg []byte) {
func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &AccountStatzEventOptions{}
var err error
_, msg := c.msgParts(rmsg)
if len(msg) != 0 {
if err = json.Unmarshal(msg, optz); err != nil {
s.sendInternalResponse(reply,
&ServerAPIResponse{
Server: &ServerInfo{},
Error: &ApiError{Code: http.StatusBadRequest, Description: err.Error()},
},
)
return
} else if s.filterRequest(&optz.EventFilterOptions) {
return
}
}
accountRespond := func(acc *Account) {
if !optz.AccountStatzOptions.IncludeUnused && acc.NumLocalConnections() == 0 {
return
}
acc.mu.Lock()
data := acc.accConns()
acc.mu.Unlock()
s.sendInternalResponse(reply,
&ServerAPIResponse{
Server: &ServerInfo{},
Data: data,
},
)
}
if len(optz.Accounts) == 0 {
s.accounts.Range(func(accName, a interface{}) bool {
accountRespond(a.(*Account))
return true
})
} else {
for _, accName := range optz.Accounts {
if a, ok := s.accounts.Load(accName); ok {
accountRespond(a.(*Account))
}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if stz, err := s.AccountStatz(&optz.AccountStatzOptions); err != nil {
return nil, err
} else if len(stz.Accounts) == 0 && !optz.IncludeUnused {
return nil, errSkipZreq
} else {
return stz, nil
}
}
},
); err != nil {
})
}); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}

Expand Down
16 changes: 7 additions & 9 deletions server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1295,13 +1295,11 @@ func TestAccountReqMonitoring(t *testing.T) {
require_NoError(t, err)
require_NoError(t, ncSys.PublishRequest(pStatz, rIb, nil))
minRespContentForBothAcc := []string{`"conns":1,`, `"total_conns":1`, `"slow_consumers":0`, `"acc":"`}
// expect one response per account
for i := 0; i < 2; i++ {
m, err := rSub.NextMsg(time.Second)
require_NoError(t, err)
// due to ordering skip check for sent/received etc..
require_Contains(t, string(m.Data), minRespContentForBothAcc...)
}
resp, err = rSub.NextMsg(time.Second)
require_NoError(t, err)
require_Contains(t, string(resp.Data), minRespContentForBothAcc...)
// expect one entry per account
require_Contains(t, string(resp.Data), fmt.Sprintf(`"acc":"%s"`, acc.Name), fmt.Sprintf(`"acc":"%s"`, sacc.Name))

// Test ping with filter by account name
require_NoError(t, ncSys.PublishRequest(pStatz, rIb, []byte(fmt.Sprintf(`{"accounts":["%s"]}`, sacc.Name))))
Expand All @@ -1326,9 +1324,9 @@ func TestAccountReqMonitoring(t *testing.T) {

require_NoError(t, ncSys.PublishRequest(pStatz, rIb,
[]byte(fmt.Sprintf(`{"accounts":["%s"], "include_unused":true}`, unusedAcc.Name))))
m, err = rSub.NextMsg(time.Second)
resp, err = rSub.NextMsg(time.Second)
require_NoError(t, err)
require_Contains(t, string(m.Data), unusedContent...)
require_Contains(t, string(resp.Data), unusedContent...)

require_NoError(t, ncSys.PublishRequest(pStatz, rIb, []byte(fmt.Sprintf(`{"accounts":["%s"]}`, unusedAcc.Name))))
_, err = rSub.NextMsg(200 * time.Millisecond)
Expand Down
4 changes: 3 additions & 1 deletion server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2193,7 +2193,9 @@ func (s *Server) AccountStatz(opts *AccountStatzOptions) (*AccountStatz, error)
if acc, ok := s.accounts.Load(a); ok {
acc := acc.(*Account)
acc.mu.RLock()
stz.Accounts = append(stz.Accounts, acc.accConns())
if opts.IncludeUnused || acc.numLocalConnections() != 0 {
stz.Accounts = append(stz.Accounts, acc.accConns())
}
acc.mu.RUnlock()
}
}
Expand Down

0 comments on commit 1f345b8

Please sign in to comment.