Skip to content

Commit

Permalink
[Added] account specific monitoring endpoint(s) (#3250)
Browse files Browse the repository at this point in the history
Added http monitoring endpoint /accstatz
It responds with a list of statz for all accounts with local connections
the argument "unused=1" can be provided to get statz for all accounts
This endpoint is also exposed as nats request under:

This monitoring endpoint is exposed via the system account.
$SYS.REQ.ACCOUNT.*.STATZ
Each server will respond with connection statistics for the requested
account. The format of the data section is a list (size 1) identical to the event
$SYS.ACCOUNT.%s.SERVER.CONNS which is sent periodically as well as on
connect/disconnect. Unless requested by options, server without the account,
or server where the account has no local connections, will not respond.

A PING endpoint exists as well. The response format is identical to
$SYS.REQ.ACCOUNT.*.STATZ
(however the data section will contain more than one account, if they exist)
In addition to general filter options the request takes a list of accounts and
an argument to include accounts without local connections (disabled by default)
$SYS.REQ.ACCOUNT.PING.STATZ

Each account has a new system account import where the local subject
$SYS.REQ.ACCOUNT.PING.STATZ essentially responds as if
the importing account name was used for $SYS.REQ.ACCOUNT.*.STATZ

The only difference between requesting ACCOUNT.PING.STATZ from within
the system account and an account is that the later can only retrieve
statz for the account the client requests from.

Also exposed the monitoring /healthz via the system account under
$SYS.REQ.SERVER.*.HEALTHZ
$SYS.REQ.SERVER.PING.HEALTHZ
No dedicated options are available for these.
HEALTHZ also accept general filter options.

Signed-off-by: Matthias Hanel <mh@synadia.com>
  • Loading branch information
matthiashanel committed Jul 12, 2022
1 parent 520d323 commit d53d2d0
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 178 deletions.
176 changes: 121 additions & 55 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ const (

connectEventSubj = "$SYS.ACCOUNT.%s.CONNECT"
disconnectEventSubj = "$SYS.ACCOUNT.%s.DISCONNECT"
accReqSubj = "$SYS.REQ.ACCOUNT.%s.%s"
accDirectReqSubj = "$SYS.REQ.ACCOUNT.%s.%s"
accPingReqSubj = "$SYS.REQ.ACCOUNT.PING.%s" // atm. only used for STATZ and CONNZ import from system account
// kept for backward compatibility when using http resolver
// this overlaps with the names for events but you'd have to have the operator private key in order to succeed.
accUpdateEventSubjOld = "$SYS.ACCOUNT.%s.CLAIMS.UPDATE"
Expand All @@ -59,7 +60,6 @@ const (
leafNodeConnectEventSubj = "$SYS.ACCOUNT.%s.LEAFNODE.CONNECT" // for internal use only
remoteLatencyEventSubj = "$SYS.LATENCY.M2.%s"
inboxRespSubj = "$SYS._INBOX.%s.%s"
accConnzReqSubj = "$SYS.REQ.ACCOUNT.PING.CONNZ"

// FIXME(dlc) - Should account scope, even with wc for now, but later on
// we can then shard as needed.
Expand Down Expand Up @@ -138,14 +138,19 @@ const DisconnectEventMsgType = "io.nats.server.advisory.v1.client_disconnect"
// updates in the absence of any changes.
type AccountNumConns struct {
TypedEvent
Server ServerInfo `json:"server"`
Account string `json:"acc"`
Conns int `json:"conns"`
LeafNodes int `json:"leafnodes"`
TotalConns int `json:"total_conns"`
Sent DataStats `json:"sent"`
Received DataStats `json:"received"`
SlowConsumers int64 `json:"slow_consumers"`
Server ServerInfo `json:"server"`
AccountStat
}

// AccountStat contains the data common between AccountNumConns and AccountStatz
type AccountStat struct {
Account string `json:"acc"`
Conns int `json:"conns"`
LeafNodes int `json:"leafnodes"`
TotalConns int `json:"total_conns"`
Sent DataStats `json:"sent"`
Received DataStats `json:"received"`
SlowConsumers int64 `json:"slow_consumers"`
}

const AccountNumConnsMsgType = "io.nats.server.advisory.v1.account_connections"
Expand Down Expand Up @@ -869,6 +874,10 @@ func (s *Server) initEventTracking() {
optz := &JszEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) { return s.Jsz(&optz.JSzOptions) })
},
"HEALTHZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &EventFilterOptions{}
s.zReq(c, reply, msg, optz, optz, func() (interface{}, error) { return s.healthz(), nil })
},
}
for name, req := range monSrvc {
subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name)
Expand All @@ -880,18 +889,26 @@ func (s *Server) initEventTracking() {
s.Errorf("Error setting up internal tracking: %v", err)
}
}
extractAccount := func(subject string) (string, error) {
extractAccount := func(c *client, subject string, msg []byte) (string, error) {
if tk := strings.Split(subject, tsep); len(tk) != accReqTokens {
return _EMPTY_, fmt.Errorf("subject %q is malformed", subject)
} else {
return tk[accReqAccIndex], nil
acc := tk[accReqAccIndex]
if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil && ci.Account != _EMPTY_ {
// Make sure the accounts match.
if ci.Account != acc {
// Do not leak too much here.
return _EMPTY_, fmt.Errorf("bad request")
}
}
return acc, nil
}
}
monAccSrvc := map[string]msgHandler{
"SUBSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &SubszEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
optz.SubszOptions.Subscriptions = true
Expand All @@ -903,17 +920,9 @@ func (s *Server) initEventTracking() {
"CONNZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &ConnzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
if ci, _, _, _, err := c.srv.getRequestInfo(c, msg); err == nil && ci.Account != _EMPTY_ {
// Make sure the accounts match.
if ci.Account != acc {
// Do not leak too much here.
return nil, fmt.Errorf("bad request")
}
optz.ConnzOptions.isAccountReq = true
}
optz.ConnzOptions.Account = acc
return s.Connz(&optz.ConnzOptions)
}
Expand All @@ -922,7 +931,7 @@ func (s *Server) initEventTracking() {
"LEAFZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &LeafzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
optz.LeafzOptions.Account = acc
Expand All @@ -933,7 +942,7 @@ func (s *Server) initEventTracking() {
"JSZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &JszEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
optz.Account = acc
Expand All @@ -944,21 +953,60 @@ func (s *Server) initEventTracking() {
"INFO": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &AccInfoEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else {
return s.accountInfo(acc)
}
})
},
// STATZ is essentially a duplicate of CONNS with an envelope identical to the others.
// For historical reasons CONNS is the odd one out.
// STATZ is also less heavy weight than INFO
"STATZ": func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &AccountStatzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(c, subject, msg); err != nil {
return nil, err
} else if acc == "PING" { // Filter PING subject. Happens for server as well. But wildcards are not used
return nil, errSkipZreq
} else {
optz.Accounts = []string{acc}
if stz, err := s.AccountStatz(&optz.AccountStatzOptions); err != nil {
return nil, err
} else if len(stz.Accounts) == 0 && !optz.IncludeUnused {
return nil, errSkipZreq
} else {
return stz, nil
}
}
})
},
"CONNS": s.connsRequest,
}
for name, req := range monAccSrvc {
if _, err := s.sysSubscribe(fmt.Sprintf(accReqSubj, "*", name), req); err != nil {
if _, err := s.sysSubscribe(fmt.Sprintf(accDirectReqSubj, "*", name), req); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
}

// For now only the STATZ subject has an account specific ping equivalent.
if _, err := s.sysSubscribe(fmt.Sprintf(accPingReqSubj, "STATZ"),
func(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
optz := &AccountStatzEventOptions{}
s.zReq(c, reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if stz, err := s.AccountStatz(&optz.AccountStatzOptions); err != nil {
return nil, err
} else if len(stz.Accounts) == 0 && !optz.IncludeUnused {
return nil, errSkipZreq
} else {
return stz, nil
}
})
}); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}

// Listen for updates when leaf nodes connect for a given account. This will
// force any gateway connections to move to `modeInterestOnly`
subject = fmt.Sprintf(leafNodeConnectEventSubj, "*")
Expand Down Expand Up @@ -1005,10 +1053,14 @@ func (s *Server) addSystemAccountExports(sacc *Account) {
if !s.EventsEnabled() {
return
}
accConnzSubj := fmt.Sprintf(accReqSubj, "*", "CONNZ")
accConnzSubj := fmt.Sprintf(accDirectReqSubj, "*", "CONNZ")
if err := sacc.AddServiceExportWithResponse(accConnzSubj, Streamed, nil); err != nil {
s.Errorf("Error adding system service export for %q: %v", accConnzSubj, err)
}
accStatzSubj := fmt.Sprintf(accDirectReqSubj, "*", "STATZ")
if err := sacc.AddServiceExportWithResponse(accStatzSubj, Streamed, nil); err != nil {
s.Errorf("Error adding system service export for %q: %v", accStatzSubj, err)
}
// Register any accounts that existed prior.
s.registerSystemImportsForExisting()

Expand Down Expand Up @@ -1369,6 +1421,12 @@ type AccountzEventOptions struct {
EventFilterOptions
}

// In the context of system events, AccountzEventOptions are options passed to Accountz
type AccountStatzEventOptions struct {
AccountStatzOptions
EventFilterOptions
}

// In the context of system events, JszEventOptions are options passed to Jsz
type JszEventOptions struct {
JSzOptions
Expand Down Expand Up @@ -1572,21 +1630,21 @@ func (s *Server) registerSystemImports(a *Account) {
return
}
// FIXME(dlc) - make a shared list between sys exports etc.
connzSubj := fmt.Sprintf(serverPingReqSubj, "CONNZ")
mappedSubj := fmt.Sprintf(accReqSubj, a.Name, "CONNZ")

// Add in this to the account in 2 places.
// "$SYS.REQ.SERVER.PING.CONNZ" and "$SYS.REQ.ACCOUNT.PING.CONNZ"
if !a.serviceImportExists(connzSubj) {
if err := a.AddServiceImport(sacc, connzSubj, mappedSubj); err != nil {
s.Errorf("Error setting up system service imports for account: %v", err)
}
}
if !a.serviceImportExists(accConnzReqSubj) {
if err := a.AddServiceImport(sacc, accConnzReqSubj, mappedSubj); err != nil {
s.Errorf("Error setting up system service imports for account: %v", err)
importSrvc := func(subj, mappedSubj string) {
if !a.serviceImportExists(subj) {
if err := a.AddServiceImport(sacc, subj, mappedSubj); err != nil {
s.Errorf("Error setting up system service import %s -> %s for account: %v",
subj, mappedSubj, err)
}
}
}
// Add in this to the account in 2 places.
// "$SYS.REQ.SERVER.PING.CONNZ" and "$SYS.REQ.ACCOUNT.PING.CONNZ"
mappedConnzSubj := fmt.Sprintf(accDirectReqSubj, a.Name, "CONNZ")
importSrvc(fmt.Sprintf(accPingReqSubj, "CONNZ"), mappedConnzSubj)
importSrvc(fmt.Sprintf(serverPingReqSubj, "CONNZ"), mappedConnzSubj)
importSrvc(fmt.Sprintf(accPingReqSubj, "STATZ"), fmt.Sprintf(accDirectReqSubj, a.Name, "STATZ"))
}

// Setup tracking for this account. This allows us to track global account activity.
Expand All @@ -1600,7 +1658,7 @@ func (s *Server) enableAccountTracking(a *Account) {
// May need to ensure we do so only if there is a known interest.
// This can get complicated with gateways.

subj := fmt.Sprintf(accReqSubj, a.Name, "CONNS")
subj := fmt.Sprintf(accDirectReqSubj, a.Name, "CONNS")
reply := fmt.Sprintf(connsRespSubj, s.info.ID)
m := accNumConnsReq{Account: a.Name}
s.sendInternalMsg(subj, reply, &m.Server, &m)
Expand Down Expand Up @@ -1643,29 +1701,18 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) {
// Build event with account name and number of local clients and leafnodes.
eid := s.nextEventID()
a.mu.Lock()
localConns := a.numLocalConnections()
leafConns := a.numLocalLeafNodes()
m := &AccountNumConns{
stat := a.statz()
m := AccountNumConns{
TypedEvent: TypedEvent{
Type: AccountNumConnsMsgType,
ID: eid,
Time: time.Now().UTC(),
},
Account: a.Name,
Conns: localConns,
LeafNodes: leafConns,
TotalConns: localConns + leafConns,
Received: DataStats{
Msgs: atomic.LoadInt64(&a.inMsgs),
Bytes: atomic.LoadInt64(&a.inBytes)},
Sent: DataStats{
Msgs: atomic.LoadInt64(&a.outMsgs),
Bytes: atomic.LoadInt64(&a.outBytes)},
SlowConsumers: atomic.LoadInt64(&a.slowConsumers),
AccountStat: *stat,
}
// Set timer to fire again unless we are at zero, but only if the account
// is not configured for JetStream.
if localConns == 0 && !a.jetStreamConfiguredNoLock() {
if m.TotalConns == 0 && !a.jetStreamConfiguredNoLock() {
clearTimer(&a.ctmr)
} else {
// Check to see if we have an HB running and update.
Expand All @@ -1682,6 +1729,25 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) {
a.mu.Unlock()
}

// Lock shoulc be held on entry
func (a *Account) statz() *AccountStat {
localConns := a.numLocalConnections()
leafConns := a.numLocalLeafNodes()
return &AccountStat{
Account: a.Name,
Conns: localConns,
LeafNodes: leafConns,
TotalConns: localConns + leafConns,
Received: DataStats{
Msgs: atomic.LoadInt64(&a.inMsgs),
Bytes: atomic.LoadInt64(&a.inBytes)},
Sent: DataStats{
Msgs: atomic.LoadInt64(&a.outMsgs),
Bytes: atomic.LoadInt64(&a.outBytes)},
SlowConsumers: atomic.LoadInt64(&a.slowConsumers),
}
}

// accConnsUpdate is called whenever there is a change to the account's
// number of active connections, or during a heartbeat.
func (s *Server) accConnsUpdate(a *Account) {
Expand Down

0 comments on commit d53d2d0

Please sign in to comment.