Skip to content

Commit

Permalink
changed format of JSClusterNoPeers error (#3459)
Browse files Browse the repository at this point in the history
* changed format of JSClusterNoPeers error

This error was introduced in #3342 and reveals to much information
This change gets rid of cluster names and peer counts.

All other counts where changed to booleans,
which are only included in the output when the filter was hit.

In addition, the set of not matching tags is included.
Furthermore, the static error description in server/errors.json 
is moved into selectPeerError

sample errors:
1) no suitable peers for placement, tags not matched ['cloud:GCP', 'country:US']"
2) no suitable peers for placement, insufficient storage

Signed-off-by: Matthias Hanel <mh@synadia.com>
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
Co-authored-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
matthiashanel and kozlovic committed Sep 9, 2022
1 parent 50d62ce commit f7cb5b1
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 58 deletions.
6 changes: 3 additions & 3 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@
"constant": "JSClusterNoPeersErrF",
"code": 400,
"error_code": 10005,
"description": "no suitable peers for placement: {err}",
"comment": "",
"description": "{err}",
"comment": "Error causing no peers to be available",
"help": "",
"url": "",
"deprecates": ""
Expand Down Expand Up @@ -1299,4 +1299,4 @@
"url": "",
"deprecates": ""
}
]
]
9 changes: 5 additions & 4 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2395,18 +2395,19 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _
}
return true
})
errs := selectPeerErrors{e}
errs := &selectPeerError{}
errs.accumulate(e)
for cluster := range clusters {
newPeers, _ := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0)
newPeers, e := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil, 0)
if len(newPeers) >= cfg.Replicas {
peers = append([]string{}, currPeers...)
peers = append(peers, newPeers[:cfg.Replicas]...)
break
}
errs = append(errs, e)
errs.accumulate(e)
}
if peers == nil {
resp.Error = NewJSClusterNoPeersError(&errs)
resp.Error = NewJSClusterNoPeersError(errs)
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
Expand Down
113 changes: 76 additions & 37 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4544,43 +4544,80 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe
}

type selectPeerError struct {
cluster string
clusterPeers int
offline int
excludeTag int
noTagMatch int
noStorage int
uniqueTag int
misc int
excludeTag bool
offline bool
noStorage bool
uniqueTag bool
misc bool
noJsClust bool
noMatchTags map[string]struct{}
}

func (e *selectPeerError) Error() string {
return fmt.Sprintf(`peer selection cluster '%s' with %d peers
offline: %d
excludeTag: %d
noTagMatch: %d
noStorage: %d
uniqueTag: %d
misc: %d
`,
e.cluster, e.clusterPeers, e.offline, e.excludeTag, e.noTagMatch, e.noStorage, e.uniqueTag, e.misc)
b := strings.Builder{}
writeBoolErrReason := func(hasErr bool, errMsg string) {
if !hasErr {
return
}
b.WriteString(", ")
b.WriteString(errMsg)
}
b.WriteString("no suitable peers for placement")
writeBoolErrReason(e.offline, "peer offline")
writeBoolErrReason(e.excludeTag, "exclude tag set")
writeBoolErrReason(e.noStorage, "insufficient storage")
writeBoolErrReason(e.uniqueTag, "server tag not unique")
writeBoolErrReason(e.misc, "miscellaneous issue")
writeBoolErrReason(e.noJsClust, "jetstream not enabled in cluster")
if len(e.noMatchTags) != 0 {
b.WriteString(", tags not matched [")
var firstTagWritten bool
for tag := range e.noMatchTags {
if firstTagWritten {
b.WriteString(", ")
}
firstTagWritten = true
b.WriteRune('\'')
b.WriteString(tag)
b.WriteRune('\'')
}
b.WriteString("]")
}
return b.String()
}

type selectPeerErrors []*selectPeerError
func (e *selectPeerError) addMissingTag(t string) {
if e.noMatchTags == nil {
e.noMatchTags = map[string]struct{}{}
}
e.noMatchTags[t] = struct{}{}
}

func (e *selectPeerErrors) Error() string {
errors := make([]string, len(*e))
for i, err := range *e {
errors[i] = err.Error()
func (e *selectPeerError) accumulate(eAdd *selectPeerError) {
if eAdd == nil {
return
}
acc := func(val *bool, valAdd bool) {
if valAdd {
*val = valAdd
}
}
acc(&e.offline, eAdd.offline)
acc(&e.excludeTag, eAdd.excludeTag)
acc(&e.noStorage, eAdd.noStorage)
acc(&e.uniqueTag, eAdd.uniqueTag)
acc(&e.misc, eAdd.misc)
acc(&e.noJsClust, eAdd.noJsClust)
for tag := range eAdd.noMatchTags {
e.addMissingTag(tag)
}
return strings.Join(errors, "\n")
}

// selectPeerGroup will select a group of peers to start a raft group.
// when peers exist already the unique tag prefix check for the replaceFirstExisting will be skipped
func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string, replaceFirstExisting int) ([]string, *selectPeerError) {
if cluster == _EMPTY_ || cfg == nil {
return nil, &selectPeerError{cluster: cluster, misc: 1}
return nil, &selectPeerError{misc: true}
}

var maxBytes uint64
Expand Down Expand Up @@ -4658,14 +4695,14 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo

// An error is a result of multiple individual placement decisions.
// Which is why we keep taps on how often which one happened.
err := selectPeerError{cluster: cluster}
err := selectPeerError{}

// Shuffle them up.
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
for _, p := range peers {
si, ok := s.nodeToInfo.Load(p.ID)
if !ok || si == nil {
err.misc++
err.misc = true
continue
}
ni := si.(nodeInfo)
Expand All @@ -4674,12 +4711,11 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
s.Debugf("Peer selection: discard %s@%s reason: not target cluster %s", ni.name, ni.cluster, cluster)
continue
}
err.clusterPeers++

// If we know its offline or we do not have config or err don't consider.
if ni.offline || ni.cfg == nil || ni.stats == nil {
s.Debugf("Peer selection: discard %s@%s reason: offline", ni.name, ni.cluster)
err.offline++
err.offline = true
continue
}

Expand All @@ -4693,7 +4729,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
if ni.tags.Contains(jsExcludePlacement) {
s.Debugf("Peer selection: discard %s@%s tags: %v reason: %s present",
ni.name, ni.cluster, ni.tags, jsExcludePlacement)
err.excludeTag++
err.excludeTag = true
continue
}

Expand All @@ -4704,11 +4740,11 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
matched = false
s.Debugf("Peer selection: discard %s@%s tags: %v reason: mandatory tag %s not present",
ni.name, ni.cluster, ni.tags, t)
err.addMissingTag(t)
break
}
}
if !matched {
err.noTagMatch++
continue
}
}
Expand Down Expand Up @@ -4741,14 +4777,14 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
if maxBytes > 0 && maxBytes > available {
s.Warnf("Peer selection: discard %s@%s (Max Bytes: %d) exceeds available %s storage of %d bytes",
ni.name, ni.cluster, maxBytes, cfg.Storage.String(), available)
err.noStorage++
err.noStorage = true
continue
}
// HAAssets contain _meta_ which we want to ignore
if maxHaAssets > 0 && ni.stats != nil && ni.stats.HAAssets > maxHaAssets {
s.Warnf("Peer selection: discard %s@%s (HA Asset Count: %d) exceeds max ha asset limit of %d for stream placement",
ni.name, ni.cluster, ni.stats.HAAssets, maxHaAssets)
err.misc++
err.misc = true
continue
}

Expand All @@ -4761,7 +4797,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
s.Debugf("Peer selection: discard %s@%s tags:%v reason: unique prefix %s not present",
ni.name, ni.cluster, ni.tags)
}
err.uniqueTag++
err.uniqueTag = true
continue
}
}
Expand All @@ -4773,6 +4809,9 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
if len(nodes) < (r - len(existing)) {
s.Debugf("Peer selection: required %d nodes but found %d (cluster: %s replica: %d existing: %v/%d peers: %d result-peers: %d err: %+v)",
(r - len(existing)), len(nodes), cluster, r, existing, replaceFirstExisting, len(peers), len(nodes), err)
if len(peers) == 0 {
err.noJsClust = true
}
return nil, &err
}
// Sort based on available from most to least.
Expand Down Expand Up @@ -4838,7 +4877,7 @@ func tieredStreamAndReservationCount(asa map[string]*streamAssignment, tier stri

// createGroupForStream will create a group for assignment for the stream.
// Lock should be held.
func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*raftGroup, *selectPeerErrors) {
func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*raftGroup, *selectPeerError) {
replicas := cfg.Replicas
if replicas == 0 {
replicas = 1
Expand All @@ -4857,16 +4896,16 @@ func (js *jetStream) createGroupForStream(ci *ClientInfo, cfg *StreamConfig) (*r
}

// Need to create a group here.
errFirst := selectPeerErrors{}
errs := &selectPeerError{}
for _, cn := range clusters {
peers, err := cc.selectPeerGroup(replicas, cn, cfg, nil, 0)
if len(peers) < replicas {
errFirst = append(errFirst, err)
errs.accumulate(err)
continue
}
return &raftGroup{Name: groupNameForStream(peers, cfg.Storage), Storage: cfg.Storage, Peers: peers, Cluster: cn}, nil
}
return nil, &errFirst
return nil, errs
}

func (acc *Account) selectLimits(cfg *StreamConfig) (*JetStreamAccountLimits, string, *jsAccount, *ApiError) {
Expand Down
10 changes: 4 additions & 6 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestJetStreamClusterStreamLimitWithAccountDefaults(t *testing.T) {
Replicas: 2,
MaxBytes: 15 * 1024 * 1024,
})
require_Contains(t, err.Error(), "no suitable peers for placement")
require_Contains(t, err.Error(), "no suitable peers for placement", "insufficient storage")
}

func TestJetStreamClusterSingleReplicaStreams(t *testing.T) {
Expand Down Expand Up @@ -3644,8 +3644,7 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) {
c.Subjects = []string{c.Name}
_, err := js.AddStream(&c)
require_Error(t, err)
require_Contains(t, err.Error(), "no suitable peers for placement", "3 peers",
"excludeTag: 1", "offline: 0", "uniqueTag: 0")
require_Contains(t, err.Error(), "no suitable peers for placement", "exclude tag set")
}

// Test update failure
Expand All @@ -3656,8 +3655,7 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) {
cfg.Replicas = 3
_, err = js.UpdateStream(cfg)
require_Error(t, err)
require_Contains(t, err.Error(), "no suitable peers for placement", "3 peers",
"excludeTag: 1", "offline: 0", "uniqueTag: 0")
require_Contains(t, err.Error(), "no suitable peers for placement", "exclude tag set")
// Test tag reload removing !jetstream tag, and allowing placement again

srv := c.serverByName("S-1")
Expand Down Expand Up @@ -9579,7 +9577,7 @@ func TestJetStreamClusterBalancedPlacement(t *testing.T) {
Replicas: 2,
MaxBytes: 1 * 1024 * 1024 * 1024,
})
require_Contains(t, err.Error(), "no suitable peers for placement")
require_Contains(t, err.Error(), "no suitable peers for placement", "insufficient storage")
}

func TestJetStreamClusterConsumerPendingBug(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
// JSClusterIncompleteErr incomplete results
JSClusterIncompleteErr ErrorIdentifier = 10004

// JSClusterNoPeersErrF no suitable peers for placement: {err}
// JSClusterNoPeersErrF Error causing no peers to be available ({err})
JSClusterNoPeersErrF ErrorIdentifier = 10005

// JSClusterNotActiveErr JetStream not in clustered mode
Expand Down Expand Up @@ -401,7 +401,7 @@ var (
JSAccountResourcesExceededErr: {Code: 400, ErrCode: 10002, Description: "resource limits exceeded for account"},
JSBadRequestErr: {Code: 400, ErrCode: 10003, Description: "bad request"},
JSClusterIncompleteErr: {Code: 503, ErrCode: 10004, Description: "incomplete results"},
JSClusterNoPeersErrF: {Code: 400, ErrCode: 10005, Description: "no suitable peers for placement: {err}"},
JSClusterNoPeersErrF: {Code: 400, ErrCode: 10005, Description: "{err}"},
JSClusterNotActiveErr: {Code: 500, ErrCode: 10006, Description: "JetStream not in clustered mode"},
JSClusterNotAssignedErr: {Code: 500, ErrCode: 10007, Description: "JetStream cluster not assigned to this server"},
JSClusterNotAvailErr: {Code: 503, ErrCode: 10008, Description: "JetStream system temporarily unavailable"},
Expand Down Expand Up @@ -583,7 +583,7 @@ func NewJSClusterIncompleteError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSClusterIncompleteErr]
}

// NewJSClusterNoPeersError creates a new JSClusterNoPeersErrF error: "no suitable peers for placement: {err}"
// NewJSClusterNoPeersError creates a new JSClusterNoPeersErrF error: "{err}"
func NewJSClusterNoPeersError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
Expand Down
11 changes: 6 additions & 5 deletions server/jetstream_super_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestJetStreamSuperClusterMetaPlacement(t *testing.T) {

// Make sure we get correct errors for tags and bad or unavailable cluster placement.
sdr := stepdown("C22")
if sdr.Error == nil || !strings.Contains(sdr.Error.Description, "no suitable peers") {
if sdr.Error == nil || !strings.Contains(sdr.Error.Description, "no replacement peer connected") {
t.Fatalf("Got incorrect error result: %+v", sdr.Error)
}
// Should work.
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestJetStreamSuperClusterUniquePlacementTag(t *testing.T) {
si, err := js.AddStream(&nats.StreamConfig{Name: name, Replicas: test.replicas, Placement: test.placement})
if test.fail {
require_Error(t, err)
require_Contains(t, err.Error(), "no suitable peers for placement")
require_Contains(t, err.Error(), "no suitable peers for placement", "server tag not unique")
return
}
require_NoError(t, err)
Expand Down Expand Up @@ -1503,7 +1503,8 @@ func TestJetStreamSuperClusterStreamTagPlacement(t *testing.T) {
Subjects: []string{"foo"},
Placement: &nats.Placement{Tags: tags},
})
require_Contains(t, err.Error(), "no suitable peers for placement")
require_Contains(t, err.Error(), "no suitable peers for placement", "tags not matched")
require_Contains(t, err.Error(), tags...)
}

placeErr("C1", []string{"cloud:GCP", "country:US"})
Expand Down Expand Up @@ -2393,7 +2394,7 @@ func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) {
_, err = js.AddStream(&nats.StreamConfig{Name: "S3", Replicas: 3, Placement: &nats.Placement{Cluster: "C1"}})
require_Error(t, err)
require_Contains(t, err.Error(), "nats: no suitable peers for placement")
require_Contains(t, err.Error(), "misc: 3")
require_Contains(t, err.Error(), "miscellaneous issue")
require_NoError(t, js.DeleteStream("S1"))
waitStatsz(3, 2)
waitStatsz(3, 1)
Expand Down Expand Up @@ -2427,7 +2428,7 @@ func TestJetStreamSuperClusterMaxHaAssets(t *testing.T) {
_, err = js.UpdateStream(&nats.StreamConfig{Name: "S2", Replicas: 3, Placement: &nats.Placement{Cluster: "C2"}})
require_Error(t, err)
require_Contains(t, err.Error(), "nats: no suitable peers for placement")
require_Contains(t, err.Error(), "misc: 3")
require_Contains(t, err.Error(), "miscellaneous issue")
}

func TestJetStreamSuperClusterStreamAlternates(t *testing.T) {
Expand Down

0 comments on commit f7cb5b1

Please sign in to comment.