-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Add] ability for operator to move streams (WIP) #3217
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -187,6 +187,16 @@ const ( | |
// Will return JSON response. | ||
JSApiRemoveServer = "$JS.API.SERVER.REMOVE" | ||
|
||
// JSApiServerStreamMove is the endpoint to move streams off a server | ||
// Only works from system account. | ||
// Will return JSON response. | ||
JSApiServerStreamMove = "$JS.API.SERVER.STREAM.MOVE" | ||
|
||
// JSApiServerInfo is the endpoint to abtain which stream a server contains | ||
// Only works from system account. | ||
// Will return JSON response. | ||
//JSApiServerStreamInfo = "$JS.API.SERVER.STREAM.INFO" | ||
|
||
// jsAckT is the template for the ack message stream coming back from a consumer | ||
// when they ACK/NAK, etc a message. | ||
jsAckT = "$JS.ACK.%s.%s" | ||
|
@@ -558,6 +568,36 @@ type JSApiMetaServerRemoveResponse struct { | |
|
||
const JSApiMetaServerRemoveResponseType = "io.nats.jetstream.api.v1.meta_server_remove_response" | ||
|
||
/* | ||
// JSApiMetaServerStreamInfoRequest will provide which streams are located on a particular server | ||
type JSApiMetaServerStreamInfoRequest struct { | ||
// Server name of the peer to be evacuated. | ||
Server string `json:"peer"` | ||
} | ||
|
||
// JSApiMetaServerStreamInfoResponse is the response to a peer evacuation request in the meta group. | ||
type JSApiMetaServerStreamInfoResponse struct { | ||
ApiResponse | ||
Success bool `json:"success,omitempty"` | ||
Content map[string][]StreamConfig `json:"content,omitempty"` | ||
} | ||
|
||
const JSApiMetaServerStreamInfoType = "io.nats.jetstream.api.v1.meta_server_stream_info" | ||
*/ | ||
|
||
// JSApiMetaServerStreamMoveRequest will move a stream on a server to another | ||
// response to this will come as JSApiStreamUpdateResponse/JSApiStreamUpdateResponseType | ||
type JSApiMetaServerStreamMoveRequest struct { | ||
// Server name of the peer to be evacuated. | ||
Server string `json:"peer"` | ||
// Account name the stream to move is in | ||
Account string `json:"account"` | ||
// Name of stream to move | ||
Stream string `json:"string"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be |
||
// Ephemeral placement tags for the move | ||
Tags []string `json:"tags,omitempty"` | ||
matthiashanel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// JSApiMsgGetRequest get a message request. | ||
type JSApiMsgGetRequest struct { | ||
Seq uint64 `json:"seq,omitempty"` | ||
|
@@ -1360,9 +1400,9 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account, | |
// If we are inline with client, we still may need to do a callout for stream info | ||
// during this call, so place in Go routine to not block client. | ||
if c.kind != ROUTER && c.kind != GATEWAY { | ||
go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, rmsg, &cfg) | ||
go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, rmsg, &cfg, nil) | ||
} else { | ||
s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, rmsg, &cfg) | ||
s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, rmsg, &cfg, nil) | ||
} | ||
return | ||
} | ||
|
@@ -2156,6 +2196,262 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac | |
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) | ||
} | ||
|
||
/* | ||
// Request to have the metaleader provide which streams a server contains | ||
func (s *Server) jsLeaderServerStreamInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { | ||
if c == nil || !s.JetStreamEnabled() { | ||
return | ||
} | ||
|
||
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) | ||
if err != nil { | ||
s.Warnf(badAPIRequestT, msg) | ||
return | ||
} | ||
|
||
js, cc := s.getJetStreamCluster() | ||
if js == nil && (cc != nil && cc.meta == nil) { | ||
return | ||
} | ||
|
||
// Extra checks here but only leader is listening. | ||
js.mu.RLock() | ||
isLeader := cc.isLeader() | ||
js.mu.RUnlock() | ||
|
||
if !isLeader { | ||
return | ||
} | ||
|
||
var resp = JSApiMetaServerStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiMetaServerStreamInfoType}} | ||
|
||
if isEmptyRequest(msg) { | ||
resp.Error = NewJSBadRequestError() | ||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) | ||
return | ||
} | ||
|
||
var req JSApiMetaServerStreamInfoRequest | ||
if err := json.Unmarshal(msg, &req); err != nil { | ||
resp.Error = NewJSInvalidJSONError() | ||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) | ||
return | ||
} | ||
|
||
if cc == nil { | ||
// single server logic | ||
if req.Server != s.info.Name { | ||
resp.Error = NewJSClusterServerNotMemberError() | ||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) | ||
return | ||
} | ||
|
||
resp.Content = make(map[string][]StreamConfig) | ||
resp.Success = true | ||
js.mu.RLock() | ||
for aName, jsa := range js.accounts { | ||
jsa.mu.RLock() | ||
for _, s := range jsa.streams { | ||
resp.Content[aName] = append(resp.Content[aName], s.cfg) | ||
} | ||
jsa.mu.RUnlock() | ||
} | ||
js.mu.RUnlock() | ||
return | ||
} | ||
// clustered code | ||
|
||
var found string | ||
js.mu.RLock() | ||
for _, p := range cc.meta.Peers() { | ||
si, ok := s.nodeToInfo.Load(p.ID) | ||
if ok && si.(nodeInfo).name == req.Server { | ||
found = p.ID | ||
break | ||
} | ||
} | ||
js.mu.RUnlock() | ||
|
||
if found == _EMPTY_ { | ||
resp.Error = NewJSClusterServerNotMemberError() | ||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) | ||
return | ||
} | ||
|
||
resp.Content = make(map[string][]StreamConfig) | ||
matthiashanel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
js.mu.Lock() | ||
for aName, a := range cc.streams { | ||
for _, s := range a { | ||
contains := false | ||
for _, p := range s.Group.Peers { | ||
if p == found { | ||
contains = true | ||
break | ||
} | ||
} | ||
if contains { | ||
resp.Content[aName] = append(resp.Content[aName], *s.Config) | ||
matthiashanel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
js.mu.Unlock() | ||
|
||
resp.Success = true | ||
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) | ||
} | ||
*/ | ||
|
||
// Request to have the metaleader move a stream on a peer to another | ||
func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { | ||
if c == nil || !s.JetStreamEnabled() { | ||
return | ||
} | ||
|
||
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) | ||
if err != nil { | ||
s.Warnf(badAPIRequestT, msg) | ||
return | ||
} | ||
|
||
js, cc := s.getJetStreamCluster() | ||
if js == nil || cc == nil || cc.meta == nil { | ||
return | ||
} | ||
|
||
// Extra checks here but only leader is listening. | ||
js.mu.RLock() | ||
isLeader := cc.isLeader() | ||
js.mu.RUnlock() | ||
|
||
if !isLeader { | ||
return | ||
} | ||
|
||
var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}} | ||
|
||
if isEmptyRequest(msg) { | ||
resp.Error = NewJSBadRequestError() | ||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) | ||
return | ||
} | ||
|
||
var req JSApiMetaServerStreamMoveRequest | ||
if err := json.Unmarshal(msg, &req); err != nil { | ||
resp.Error = NewJSInvalidJSONError() | ||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) | ||
return | ||
} | ||
|
||
var srcPeer string | ||
js.mu.RLock() | ||
for _, p := range cc.meta.Peers() { | ||
si, ok := s.nodeToInfo.Load(p.ID) | ||
if ok && si.(nodeInfo).name == req.Server { | ||
srcPeer = p.ID | ||
break | ||
} | ||
} | ||
js.mu.RUnlock() | ||
|
||
if srcPeer == _EMPTY_ { | ||
resp.Error = NewJSClusterServerNotMemberError() | ||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) | ||
return | ||
} | ||
|
||
targetAcc, ok := s.accounts.Load(req.Account) | ||
if !ok { | ||
resp.Error = NewJSNoAccountError() | ||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) | ||
return | ||
} | ||
|
||
streamFound := false | ||
cfg := StreamConfig{} | ||
currPeers := []string{} | ||
currCluster := _EMPTY_ | ||
js.mu.Lock() | ||
streams, ok := cc.streams[req.Account] | ||
if ok { | ||
sa, ok := streams[req.Stream] | ||
if ok { | ||
cfg = *sa.Config | ||
streamFound = true | ||
currPeers = sa.Group.Peers | ||
currCluster = sa.Group.Cluster | ||
} | ||
} | ||
js.mu.Unlock() | ||
|
||
// make sure src peer is first. Removal will drop peers from the left | ||
for i := 0; i < len(currPeers); i++ { | ||
if currPeers[i] == srcPeer { | ||
currPeers[i] = currPeers[0] | ||
currPeers[0] = srcPeer | ||
break | ||
} | ||
} | ||
|
||
if !streamFound { | ||
resp.Error = NewJSStreamNotFoundError() | ||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) | ||
return | ||
} | ||
|
||
// make sure client is scoped to requested account | ||
ciNew := *(ci) | ||
ciNew.Account = req.Account | ||
|
||
// backup placement such that peers can be looked up with modified tag list | ||
var origPlacement *Placement | ||
if cfg.Placement != nil { | ||
tmp := *cfg.Placement | ||
origPlacement = &tmp | ||
} | ||
|
||
if len(req.Tags) > 0 { | ||
if cfg.Placement == nil { | ||
cfg.Placement = &Placement{} | ||
} | ||
cfg.Placement.Tags = append(cfg.Placement.Tags, req.Tags...) | ||
} | ||
|
||
peers := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers) | ||
if len(peers) <= cfg.Replicas { | ||
// since expanding in the same cluster did not yield a result, try in different cluster | ||
peers = nil | ||
|
||
clusters := map[string]struct{}{} | ||
s.nodeToInfo.Range(func(_, ni interface{}) bool { | ||
if currCluster != ni.(nodeInfo).cluster { | ||
clusters[ni.(nodeInfo).cluster] = struct{}{} | ||
} | ||
return true | ||
}) | ||
for cluster := range clusters { | ||
newPeers := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil) | ||
if len(newPeers) >= cfg.Replicas { | ||
peers = append([]string{}, currPeers...) | ||
peers = append(peers, newPeers[:cfg.Replicas]...) | ||
break | ||
} | ||
} | ||
if peers == nil { | ||
resp.Error = NewJSClusterNoPeersError() | ||
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) | ||
return | ||
} | ||
} | ||
|
||
cfg.Placement = origPlacement | ||
|
||
if c.kind != ROUTER && c.kind != GATEWAY { | ||
derekcollison marked this conversation as resolved.
Show resolved
Hide resolved
|
||
go s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers) | ||
} else { | ||
s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers) | ||
} | ||
} | ||
|
||
// Request to have the meta leader stepdown. | ||
// These will only be received the the meta leaders, so less checking needed. | ||
func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this come in a follow up PR? Without this is hard to know when its done moving
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we also adding the ability to get a list of streams+account on a given server?