Skip to content

Commit

Permalink
Improve StreamMoveInProgressError (#3376)
Browse files Browse the repository at this point in the history
by adding progress indicators

Signed-off-by: Matthias Hanel <mh@synadia.com>
  • Loading branch information
matthiashanel committed Aug 18, 2022
1 parent 2573b43 commit e11491d
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 10 deletions.
4 changes: 2 additions & 2 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1220,10 +1220,10 @@
"deprecates": ""
},
{
"constant": "JSStreamMoveInProgress",
"constant": "JSStreamMoveInProgressF",
"code": 400,
"error_code": 10124,
"description": "stream move already in progress",
"description": "stream move already in progress: {msg}",
"comment": "",
"help": "",
"url": "",
Expand Down
29 changes: 28 additions & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5076,7 +5076,34 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su

// Check if this is a move request, but no cancellation, and we are already moving this stream.
if isMoveRequest && !isMoveCancel && osa.Config.Replicas != len(rg.Peers) {
resp.Error = NewJSStreamMoveInProgressError()
// obtain stats to include in error message
msg := _EMPTY_
if s.allPeersOffline(rg) {
msg = fmt.Sprintf("all %d peers offline", len(rg.Peers))
} else {
// Need to release js lock.
js.mu.Unlock()
if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
msg = fmt.Sprintf("error retrieving info: %s", err.Error())
} else if si := si.(*StreamInfo); si != nil {
currentCount := 0
if si.Cluster.Leader != _EMPTY_ {
currentCount++
}
combinedLag := uint64(0)
for _, r := range si.Cluster.Replicas {
if r.Current {
currentCount++
}
combinedLag += r.Lag
}
msg = fmt.Sprintf("total peers: %d, current peers: %d, combined lag: %d",
len(rg.Peers), currentCount, combinedLag)
}
// Re-acquire here.
js.mu.Lock()
}
resp.Error = NewJSStreamMoveInProgressError(msg)
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
Expand Down
18 changes: 12 additions & 6 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ const (
// JSStreamMoveAndScaleErr can not move and scale a stream in a single update
JSStreamMoveAndScaleErr ErrorIdentifier = 10123

// JSStreamMoveInProgress stream move already in progress
JSStreamMoveInProgress ErrorIdentifier = 10124
// JSStreamMoveInProgressF stream move already in progress: {msg}
JSStreamMoveInProgressF ErrorIdentifier = 10124

// JSStreamMoveNotInProgress stream move not in progress
JSStreamMoveNotInProgress ErrorIdentifier = 10129
Expand Down Expand Up @@ -497,7 +497,7 @@ var (
JSStreamMirrorNotUpdatableErr: {Code: 400, ErrCode: 10055, Description: "stream mirror configuration can not be updated"},
JSStreamMismatchErr: {Code: 400, ErrCode: 10056, Description: "stream name in subject does not match request"},
JSStreamMoveAndScaleErr: {Code: 400, ErrCode: 10123, Description: "can not move and scale a stream in a single update"},
JSStreamMoveInProgress: {Code: 400, ErrCode: 10124, Description: "stream move already in progress"},
JSStreamMoveInProgressF: {Code: 400, ErrCode: 10124, Description: "stream move already in progress: {msg}"},
JSStreamMoveNotInProgress: {Code: 400, ErrCode: 10129, Description: "stream move not in progress"},
JSStreamMsgDeleteFailedF: {Code: 500, ErrCode: 10057, Description: "{err}"},
JSStreamNameContainsPathSeparatorsErr: {Code: 400, ErrCode: 10128, Description: "Stream name can not contain path separators"},
Expand Down Expand Up @@ -1707,14 +1707,20 @@ func NewJSStreamMoveAndScaleError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSStreamMoveAndScaleErr]
}

// NewJSStreamMoveInProgressError creates a new JSStreamMoveInProgress error: "stream move already in progress"
func NewJSStreamMoveInProgressError(opts ...ErrorOption) *ApiError {
// NewJSStreamMoveInProgressError creates a new JSStreamMoveInProgressF error: "stream move already in progress: {msg}"
func NewJSStreamMoveInProgressError(msg interface{}, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSStreamMoveInProgress]
e := ApiErrors[JSStreamMoveInProgressF]
args := e.toReplacerArgs([]interface{}{"{msg}", msg})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSStreamMoveNotInProgressError creates a new JSStreamMoveNotInProgress error: "stream move not in progress"
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_super_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1778,7 +1778,7 @@ func TestJetStreamSuperClusterMovingStreamsAndConsumers(t *testing.T) {
Replicas: replicas,
Placement: &nats.Placement{Tags: []string{"cloud:aws"}},
})
require_Error(t, err, NewJSStreamMoveInProgressError())
require_Contains(t, err.Error(), "stream move already in progress")

checkFor(t, 10*time.Second, 10*time.Millisecond, func() error {
si, err := js.StreamInfo("MOVE")
Expand Down

0 comments on commit e11491d

Please sign in to comment.