Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Added] account specific monitoring endpoint(s) #3250

Merged
merged 3 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
176 changes: 121 additions & 55 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ const (

connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT"
disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT"
accReqSubj = "$SYS.REQ.ACCOUNT.%s.%s"
accDirectReqSubj = "$SYS.REQ.ACCOUNT.%s.%s"
accPingReqSubj = "$SYS.REQ.ACCOUNT.PING.%s" // atm. only used for STATZ and CONNZ import from system account
// kept for backward compatibility when using http resolver
// this overlaps with the names for events but you'd have to have the operator private key in order to succeed.
accUpdateEventSubjOld = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE"
Expand All @@ -59,7 +60,6 @@ const (
leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT" // for internal use only
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
inboxRespSubj = "$SYS._INBOX.%s.%s"
accConnzReqSubj = "$SYS.REQ.ACCOUNT.PING.CONNZ"

// FIXME(dlc) - Should account scope, even with wc for now, but later on
// we can then shard as needed.
Expand Down Expand Up @@ -138,14 +138,19 @@ const DisconnectEventMsgType = "io.nats.server.advisory.v1.client_disconnect"
// updates in the absence of any changes.
type AccountNumConns struct {
TypedEvent
Server ServerInfo `json:"server"`
Account string `json:"acc"`
Conns int `json:"conns"`
LeafNodes int `json:"leafnodes"`
TotalConns int `json:"total_conns"`
Sent DataStats `json:"sent"`
Received DataStats `json:"received"`
SlowConsumers int64 `json:"slow_consumers"`
Server ServerInfo `json:"server"`
AccountStat
}

// AccountStat contains the data common between AccountNumConns and AccountStatz
type AccountStat struct {
Account string `json:"acc"`
Conns int `json:"conns"`
LeafNodes int `json:"leafnodes"`
TotalConns int `json:"total_conns"`
Sent DataStats `json:"sent"`
Received DataStats `json:"received"`
SlowConsumers int64 `json:"slow_consumers"`
}

const AccountNumConnsMsgType = "io.nats.server.advisory.v1.account_connections"
Expand Down Expand Up @@ -869,6 +874,10 @@ func (s *Server) initEventTracking() {
optz := &JszEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Jsz(&optz.JSzOptions) })
},
"HEALTHZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &EventFilterOptions{}
s.zReq(c, reply, msg, optz, optz, func() (interface{}, error) { return s.healthz(), nil })
},
}
for name, req := range monSrvc {
subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name)
Expand All @@ -880,18 +889,26 @@ func (s *Server) initEventTracking() {
s.Errorf("Error setting up internal tracking: %v", err)
}
}
extractAccount := func(subject string) (string, error) {
extractAccount := func(c *client, subject string, msg []byte) (string, error) {
if tk := strings.Split(subject, tsep); len(tk) != accReqTokens {
return _EMPTY_, fmt.Errorf("subject %q is malformed", subject)
} else {
return tk[accReqAccIndex], nil
acc := tk[accReqAccIndex]
if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil && ci.Account != _EMPTY_ {
// Make sure the accounts match.
if ci.Account != acc {
// Do not leak too much here.
return _EMPTY_, fmt.Errorf("bad request")
}
}
return acc, nil
}
}
monAccSrvc := map[string]msgHandler{
"SUBSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &SubszEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
optz.SubszOptions.Subscriptions = true
Expand All @@ -903,17 +920,9 @@ func (s *Server) initEventTracking() {
"CONNZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &ConnzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil && ci.Account != _EMPTY_ {
// Make sure the accounts match.
if ci.Account != acc {
// Do not leak too much here.
return nil, fmt.Errorf("bad request")
}
optz.ConnzOptions.isAccountReq = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is removed, the whole field isAccountReq in ConnzOptions has no relevance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted isAccountReq and it's usage.
I believe it's only occurrence was redundant with account being set or not.
That usage adjusted the Total, and I believe this should happen even when the request for an account comes from within the system account.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted isAccountReq and it's usage.
I believe it's only occurrence was redundant with account being set or not.
That usage adjusted the Total, and I believe this should happen even when the request for an account comes from within the system account.

}
optz.ConnzOptions.Account = acc
return s.Connz(&optz.ConnzOptions)
}
Expand All @@ -922,7 +931,7 @@ func (s *Server) initEventTracking() {
"LEAFZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &LeafzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
optz.LeafzOptions.Account = acc
Expand All @@ -933,7 +942,7 @@ func (s *Server) initEventTracking() {
"JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &JszEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
optz.Account = acc
Expand All @@ -944,21 +953,60 @@ func (s *Server) initEventTracking() {
"INFO": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &AccInfoEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
return s.accountInfo(acc)
}
})
},
// STATZ is essentially a duplicate of CONNS with an envelope identical to the others.
// For historical reasons CONNS is the odd one out.
// STATZ is also less heavy weight than INFO
"STATZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &AccountStatzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else if acc == "PING" { // Filter PING subject. Happens for server as well. But wildcards are not used
return nil, errSkipZreq
} else {
optz.Accounts = []string{acc}
if stz, err := s.AccountStatz(&optz.AccountStatzOptions); err != nil {
return nil, err
} else if len(stz.Accounts) == 0 && !optz.IncludeUnused {
return nil, errSkipZreq
} else {
return stz, nil
}
}
})
},
"CONNS": s.connsRequest,
}
for name, req := range monAccSrvc {
if _, err := s.sysSubscribe(fmt.Sprintf(accReqSubj, "*", name), req); err != nil {
if _, err := s.sysSubscribe(fmt.Sprintf(accDirectReqSubj, "*", name), req); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
}

// For now only the STATZ subject has an account specific ping equivalent.
if _, err := s.sysSubscribe(fmt.Sprintf(accPingReqSubj, "STATZ"),
func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &AccountStatzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if stz, err := s.AccountStatz(&optz.AccountStatzOptions); err != nil {
return nil, err
} else if len(stz.Accounts) == 0 && !optz.IncludeUnused {
return nil, errSkipZreq
} else {
return stz, nil
}
})
}); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}

// Listen for updates when leaf nodes connect for a given account. This will
// force any gateway connections to move to `modeInterestOnly`
subject = fmt.Sprintf(leafNodeConnectEventSubj, "*")
Expand Down Expand Up @@ -1005,10 +1053,14 @@ func (s *Server) addSystemAccountExports(sacc *Account) {
if !s.EventsEnabled() {
return
}
accConnzSubj := fmt.Sprintf(accReqSubj, "*", "CONNZ")
accConnzSubj := fmt.Sprintf(accDirectReqSubj, "*", "CONNZ")
if err := sacc.AddServiceExportWithResponse(accConnzSubj, Streamed, nil); err != nil {
s.Errorf("Error adding system service export for %q: %v", accConnzSubj, err)
}
accStatzSubj := fmt.Sprintf(accDirectReqSubj, "*", "STATZ")
matthiashanel marked this conversation as resolved.
Show resolved Hide resolved
if err := sacc.AddServiceExportWithResponse(accStatzSubj, Streamed, nil); err != nil {
s.Errorf("Error adding system service export for %q: %v", accStatzSubj, err)
}
// Register any accounts that existed prior.
s.registerSystemImportsForExisting()

Expand Down Expand Up @@ -1369,6 +1421,12 @@ type AccountzEventOptions struct {
EventFilterOptions
}

// In the context of system events, AccountzEventOptions are options passed to Accountz
type AccountStatzEventOptions struct {
AccountStatzOptions
EventFilterOptions
}

// In the context of system events, JszEventOptions are options passed to Jsz
type JszEventOptions struct {
JSzOptions
Expand Down Expand Up @@ -1572,21 +1630,21 @@ func (s *Server) registerSystemImports(a *Account) {
return
}
// FIXME(dlc) - make a shared list between sys exports etc.
connzSubj := fmt.Sprintf(serverPingReqSubj, "CONNZ")
mappedSubj := fmt.Sprintf(accReqSubj, a.Name, "CONNZ")

// Add in this to the account in 2 places.
// "$SYS.REQ.SERVER.PING.CONNZ" and "$SYS.REQ.ACCOUNT.PING.CONNZ"
if !a.serviceImportExists(connzSubj) {
if err := a.AddServiceImport(sacc, connzSubj, mappedSubj); err != nil {
s.Errorf("Error setting up system service imports for account: %v", err)
}
}
if !a.serviceImportExists(accConnzReqSubj) {
if err := a.AddServiceImport(sacc, accConnzReqSubj, mappedSubj); err != nil {
s.Errorf("Error setting up system service imports for account: %v", err)
importSrvc := func(subj, mappedSubj string) {
if !a.serviceImportExists(subj) {
if err := a.AddServiceImport(sacc, subj, mappedSubj); err != nil {
s.Errorf("Error setting up system service import %s -> %s for account: %v",
subj, mappedSubj, err)
}
}
}
// Add in this to the account in 2 places.
// "$SYS.REQ.SERVER.PING.CONNZ" and "$SYS.REQ.ACCOUNT.PING.CONNZ"
mappedConnzSubj := fmt.Sprintf(accDirectReqSubj, a.Name, "CONNZ")
importSrvc(fmt.Sprintf(accPingReqSubj, "CONNZ"), mappedConnzSubj)
importSrvc(fmt.Sprintf(serverPingReqSubj, "CONNZ"), mappedConnzSubj)
importSrvc(fmt.Sprintf(accPingReqSubj, "STATZ"), fmt.Sprintf(accDirectReqSubj, a.Name, "STATZ"))
}

// Setup tracking for this account. This allows us to track global account activity.
Expand All @@ -1600,7 +1658,7 @@ func (s *Server) enableAccountTracking(a *Account) {
// May need to ensure we do so only if there is a known interest.
// This can get complicated with gateways.

subj := fmt.Sprintf(accReqSubj, a.Name, "CONNS")
subj := fmt.Sprintf(accDirectReqSubj, a.Name, "CONNS")
reply := fmt.Sprintf(connsRespSubj, s.info.ID)
m := accNumConnsReq{Account: a.Name}
s.sendInternalMsg(subj, reply, &m.Server, &m)
Expand Down Expand Up @@ -1643,29 +1701,18 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) {
// Build event with account name and number of local clients and leafnodes.
eid := s.nextEventID()
a.mu.Lock()
localConns := a.numLocalConnections()
leafConns := a.numLocalLeafNodes()
m := &AccountNumConns{
stat := a.statz()
m := AccountNumConns{
TypedEvent: TypedEvent{
Type: AccountNumConnsMsgType,
ID: eid,
Time: time.Now().UTC(),
},
Account: a.Name,
Conns: localConns,
LeafNodes: leafConns,
TotalConns: localConns + leafConns,
Received: DataStats{
Msgs: atomic.LoadInt64(&a.inMsgs),
Bytes: atomic.LoadInt64(&a.inBytes)},
Sent: DataStats{
Msgs: atomic.LoadInt64(&a.outMsgs),
Bytes: atomic.LoadInt64(&a.outBytes)},
SlowConsumers: atomic.LoadInt64(&a.slowConsumers),
AccountStat: *stat,
}
// Set timer to fire again unless we are at zero, but only if the account
// is not configured for JetStream.
if localConns == 0 && !a.jetStreamConfiguredNoLock() {
if m.TotalConns == 0 && !a.jetStreamConfiguredNoLock() {
clearTimer(&a.ctmr)
} else {
// Check to see if we have an HB running and update.
Expand All @@ -1682,6 +1729,25 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) {
a.mu.Unlock()
}

// Lock shoulc be held on entry
func (a *Account) statz() *AccountStat {
localConns := a.numLocalConnections()
leafConns := a.numLocalLeafNodes()
return &AccountStat{
Account: a.Name,
Conns: localConns,
LeafNodes: leafConns,
TotalConns: localConns + leafConns,
Received: DataStats{
Msgs: atomic.LoadInt64(&a.inMsgs),
Bytes: atomic.LoadInt64(&a.inBytes)},
Sent: DataStats{
Msgs: atomic.LoadInt64(&a.outMsgs),
Bytes: atomic.LoadInt64(&a.outBytes)},
SlowConsumers: atomic.LoadInt64(&a.slowConsumers),
}
}

// accConnsUpdate is called whenever there is a change to the account's
// number of active connections, or during a heartbeat.
func (s *Server) accConnsUpdate(a *Account) {
Expand Down