Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Add] ability for operator to move streams (WIP) #3217

Merged
merged 1 commit into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
300 changes: 298 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ const (
// Will return JSON response.
JSApiRemoveServer = "$JS.API.SERVER.REMOVE"

// JSApiServerStreamMove is the endpoint to move streams off a server
// Only works from system account.
// Will return JSON response.
JSApiServerStreamMove = "$JS.API.SERVER.STREAM.MOVE"

// JSApiServerInfo is the endpoint to abtain which stream a server contains
// Only works from system account.
// Will return JSON response.
//JSApiServerStreamInfo = "$JS.API.SERVER.STREAM.INFO"
Copy link
Contributor

@ripienaar ripienaar Jul 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this come in a follow up PR? Without this is hard to know when its done moving

Copy link
Contributor

@ripienaar ripienaar Jul 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we also adding the ability to get a list of streams+account on a given server?


// jsAckT is the template for the ack message stream coming back from a consumer
// when they ACK/NAK, etc a message.
jsAckT = "$JS.ACK.%s.%s"
Expand Down Expand Up @@ -558,6 +568,36 @@ type JSApiMetaServerRemoveResponse struct {

const JSApiMetaServerRemoveResponseType = "io.nats.jetstream.api.v1.meta_server_remove_response"

/*
// JSApiMetaServerStreamInfoRequest will provide which streams are located on a particular server
type JSApiMetaServerStreamInfoRequest struct {
// Server name of the peer to be evacuated.
Server string `json:"peer"`
}

// JSApiMetaServerStreamInfoResponse is the response to a peer evacuation request in the meta group.
type JSApiMetaServerStreamInfoResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
Content map[string][]StreamConfig `json:"content,omitempty"`
}

const JSApiMetaServerStreamInfoType = "io.nats.jetstream.api.v1.meta_server_stream_info"
*/

// JSApiMetaServerStreamMoveRequest will move a stream on a server to another
// response to this will come as JSApiStreamUpdateResponse/JSApiStreamUpdateResponseType
type JSApiMetaServerStreamMoveRequest struct {
// Server name of the peer to be evacuated.
Server string `json:"peer"`
// Account name the stream to move is in
Account string `json:"account"`
// Name of stream to move
Stream string `json:"string"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be stream, not string

// Ephemeral placement tags for the move
Tags []string `json:"tags,omitempty"`
matthiashanel marked this conversation as resolved.
Show resolved Hide resolved
}

// JSApiMsgGetRequest get a message request.
type JSApiMsgGetRequest struct {
Seq uint64 `json:"seq,omitempty"`
Expand Down Expand Up @@ -1360,9 +1400,9 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
// If we are inline with client, we still may need to do a callout for stream info
// during this call, so place in Go routine to not block client.
if c.kind != ROUTER && c.kind != GATEWAY {
go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, rmsg, &cfg)
go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, rmsg, &cfg, nil)
} else {
s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, rmsg, &cfg)
s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, rmsg, &cfg, nil)
}
return
}
Expand Down Expand Up @@ -2156,6 +2196,262 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}

/*
// Request to have the metaleader provide which streams a server contains
func (s *Server) jsLeaderServerStreamInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
return
}

ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
if err != nil {
s.Warnf(badAPIRequestT, msg)
return
}

js, cc := s.getJetStreamCluster()
if js == nil && (cc != nil && cc.meta == nil) {
return
}

// Extra checks here but only leader is listening.
js.mu.RLock()
isLeader := cc.isLeader()
js.mu.RUnlock()

if !isLeader {
return
}

var resp = JSApiMetaServerStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiMetaServerStreamInfoType}}

if isEmptyRequest(msg) {
resp.Error = NewJSBadRequestError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

var req JSApiMetaServerStreamInfoRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

if cc == nil {
// single server logic
if req.Server != s.info.Name {
resp.Error = NewJSClusterServerNotMemberError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

resp.Content = make(map[string][]StreamConfig)
resp.Success = true
js.mu.RLock()
for aName, jsa := range js.accounts {
jsa.mu.RLock()
for _, s := range jsa.streams {
resp.Content[aName] = append(resp.Content[aName], s.cfg)
}
jsa.mu.RUnlock()
}
js.mu.RUnlock()
return
}
// clustered code

var found string
js.mu.RLock()
for _, p := range cc.meta.Peers() {
si, ok := s.nodeToInfo.Load(p.ID)
if ok && si.(nodeInfo).name == req.Server {
found = p.ID
break
}
}
js.mu.RUnlock()

if found == _EMPTY_ {
resp.Error = NewJSClusterServerNotMemberError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

resp.Content = make(map[string][]StreamConfig)
matthiashanel marked this conversation as resolved.
Show resolved Hide resolved
js.mu.Lock()
for aName, a := range cc.streams {
for _, s := range a {
contains := false
for _, p := range s.Group.Peers {
if p == found {
contains = true
break
}
}
if contains {
resp.Content[aName] = append(resp.Content[aName], *s.Config)
matthiashanel marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
js.mu.Unlock()

resp.Success = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
*/

// Request to have the metaleader move a stream on a peer to another
func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
return
}

ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
if err != nil {
s.Warnf(badAPIRequestT, msg)
return
}

js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || cc.meta == nil {
return
}

// Extra checks here but only leader is listening.
js.mu.RLock()
isLeader := cc.isLeader()
js.mu.RUnlock()

if !isLeader {
return
}

var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}

if isEmptyRequest(msg) {
resp.Error = NewJSBadRequestError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

var req JSApiMetaServerStreamMoveRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

var srcPeer string
js.mu.RLock()
for _, p := range cc.meta.Peers() {
si, ok := s.nodeToInfo.Load(p.ID)
if ok && si.(nodeInfo).name == req.Server {
srcPeer = p.ID
break
}
}
js.mu.RUnlock()

if srcPeer == _EMPTY_ {
resp.Error = NewJSClusterServerNotMemberError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

targetAcc, ok := s.accounts.Load(req.Account)
if !ok {
resp.Error = NewJSNoAccountError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

streamFound := false
cfg := StreamConfig{}
currPeers := []string{}
currCluster := _EMPTY_
js.mu.Lock()
streams, ok := cc.streams[req.Account]
if ok {
sa, ok := streams[req.Stream]
if ok {
cfg = *sa.Config
streamFound = true
currPeers = sa.Group.Peers
currCluster = sa.Group.Cluster
}
}
js.mu.Unlock()

// make sure src peer is first. Removal will drop peers from the left
for i := 0; i < len(currPeers); i++ {
if currPeers[i] == srcPeer {
currPeers[i] = currPeers[0]
currPeers[0] = srcPeer
break
}
}

if !streamFound {
resp.Error = NewJSStreamNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

// make sure client is scoped to requested account
ciNew := *(ci)
ciNew.Account = req.Account

// backup placement such that peers can be looked up with modified tag list
var origPlacement *Placement
if cfg.Placement != nil {
tmp := *cfg.Placement
origPlacement = &tmp
}

if len(req.Tags) > 0 {
if cfg.Placement == nil {
cfg.Placement = &Placement{}
}
cfg.Placement.Tags = append(cfg.Placement.Tags, req.Tags...)
}

peers := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers)
if len(peers) <= cfg.Replicas {
// since expanding in the same cluster did not yield a result, try in different cluster
peers = nil

clusters := map[string]struct{}{}
s.nodeToInfo.Range(func(_, ni interface{}) bool {
if currCluster != ni.(nodeInfo).cluster {
clusters[ni.(nodeInfo).cluster] = struct{}{}
}
return true
})
for cluster := range clusters {
newPeers := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil)
if len(newPeers) >= cfg.Replicas {
peers = append([]string{}, currPeers...)
peers = append(peers, newPeers[:cfg.Replicas]...)
break
}
}
if peers == nil {
resp.Error = NewJSClusterNoPeersError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}

cfg.Placement = origPlacement

if c.kind != ROUTER && c.kind != GATEWAY {
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
go s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers)
} else {
s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers)
}
}

// Request to have the meta leader stepdown.
// These will only be received the the meta leaders, so less checking needed.
func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
Expand Down