Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve StreamMoveInProgressError #3376

Merged
merged 1 commit into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1220,10 +1220,10 @@
"deprecates": ""
},
{
"constant": "JSStreamMoveInProgress",
"constant": "JSStreamMoveInProgressF",
"code": 400,
"error_code": 10124,
"description": "stream move already in progress",
"description": "stream move already in progress: {msg}",
"comment": "",
"help": "",
"url": "",
Expand Down
29 changes: 28 additions & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5092,7 +5092,34 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su

// Check if this is a move request, but no cancellation, and we are already moving this stream.
if isMoveRequest && !isMoveCancel && osa.Config.Replicas != len(rg.Peers) {
resp.Error = NewJSStreamMoveInProgressError()
// obtain stats to include in error message
msg := _EMPTY_
if s.allPeersOffline(rg) {
msg = fmt.Sprintf("all %d peers offline", len(rg.Peers))
} else {
// Need to release js lock.
js.mu.Unlock()
if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
msg = fmt.Sprintf("error retrieving info: %s", err.Error())
} else if si := si.(*StreamInfo); si != nil {
currentCount := 0
if si.Cluster.Leader != _EMPTY_ {
currentCount++
}
combinedLag := uint64(0)
for _, r := range si.Cluster.Replicas {
if r.Current {
currentCount++
}
combinedLag += r.Lag
}
msg = fmt.Sprintf("total peers: %d, current peers: %d, combined lag: %d",
len(rg.Peers), currentCount, combinedLag)
}
// Re-acquire here.
js.mu.Lock()
}
resp.Error = NewJSStreamMoveInProgressError(msg)
s.sendAPIErrResponse(ci, acc, subject, reply, string(rmsg), s.jsonResponse(&resp))
return
}
Expand Down
18 changes: 12 additions & 6 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ const (
// JSStreamMoveAndScaleErr can not move and scale a stream in a single update
JSStreamMoveAndScaleErr ErrorIdentifier = 10123

// JSStreamMoveInProgress stream move already in progress
JSStreamMoveInProgress ErrorIdentifier = 10124
// JSStreamMoveInProgressF stream move already in progress: {msg}
JSStreamMoveInProgressF ErrorIdentifier = 10124

// JSStreamMoveNotInProgress stream move not in progress
JSStreamMoveNotInProgress ErrorIdentifier = 10129
Expand Down Expand Up @@ -497,7 +497,7 @@ var (
JSStreamMirrorNotUpdatableErr: {Code: 400, ErrCode: 10055, Description: "stream mirror configuration can not be updated"},
JSStreamMismatchErr: {Code: 400, ErrCode: 10056, Description: "stream name in subject does not match request"},
JSStreamMoveAndScaleErr: {Code: 400, ErrCode: 10123, Description: "can not move and scale a stream in a single update"},
JSStreamMoveInProgress: {Code: 400, ErrCode: 10124, Description: "stream move already in progress"},
JSStreamMoveInProgressF: {Code: 400, ErrCode: 10124, Description: "stream move already in progress: {msg}"},
JSStreamMoveNotInProgress: {Code: 400, ErrCode: 10129, Description: "stream move not in progress"},
JSStreamMsgDeleteFailedF: {Code: 500, ErrCode: 10057, Description: "{err}"},
JSStreamNameContainsPathSeparatorsErr: {Code: 400, ErrCode: 10128, Description: "Stream name can not contain path separators"},
Expand Down Expand Up @@ -1707,14 +1707,20 @@ func NewJSStreamMoveAndScaleError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSStreamMoveAndScaleErr]
}

// NewJSStreamMoveInProgressError creates a new JSStreamMoveInProgress error: "stream move already in progress"
func NewJSStreamMoveInProgressError(opts ...ErrorOption) *ApiError {
// NewJSStreamMoveInProgressError creates a new JSStreamMoveInProgressF error: "stream move already in progress: {msg}"
func NewJSStreamMoveInProgressError(msg interface{}, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSStreamMoveInProgress]
e := ApiErrors[JSStreamMoveInProgressF]
args := e.toReplacerArgs([]interface{}{"{msg}", msg})
return &ApiError{
Code: e.Code,
ErrCode: e.ErrCode,
Description: strings.NewReplacer(args...).Replace(e.Description),
}
}

// NewJSStreamMoveNotInProgressError creates a new JSStreamMoveNotInProgress error: "stream move not in progress"
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_super_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1778,7 +1778,7 @@ func TestJetStreamSuperClusterMovingStreamsAndConsumers(t *testing.T) {
Replicas: replicas,
Placement: &nats.Placement{Tags: []string{"cloud:aws"}},
})
require_Error(t, err, NewJSStreamMoveInProgressError())
require_Contains(t, err.Error(), "stream move already in progress")

checkFor(t, 10*time.Second, 10*time.Millisecond, func() error {
si, err := js.StreamInfo("MOVE")
Expand Down