Skip to content

Commit

Permalink
[Add] ability for operator to move streams
Browse files Browse the repository at this point in the history
Also added:
ability to reload tags
special tag (!jetstream) to remove peer from peer placement
$JS.API.SERVER.STREAM.MOVE subject to initiate move away from a server

Signed-off-by: Matthias Hanel <mh@synadia.com>
  • Loading branch information
matthiashanel committed Jun 25, 2022
1 parent 67a182f commit 555b9c1
Show file tree
Hide file tree
Showing 4 changed files with 684 additions and 60 deletions.
300 changes: 298 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ const (
// Will return JSON response.
JSApiRemoveServer = "$JS.API.SERVER.REMOVE"

// JSApiServerStreamMove is the endpoint to move streams off a server
// Only works from system account.
// Will return JSON response.
JSApiServerStreamMove = "$JS.API.SERVER.STREAM.MOVE"

// JSApiServerInfo is the endpoint to abtain which stream a server contains
// Only works from system account.
// Will return JSON response.
//JSApiServerStreamInfo = "$JS.API.SERVER.STREAM.INFO"

// jsAckT is the template for the ack message stream coming back from a consumer
// when they ACK/NAK, etc a message.
jsAckT = "$JS.ACK.%s.%s"
Expand Down Expand Up @@ -558,6 +568,36 @@ type JSApiMetaServerRemoveResponse struct {

const JSApiMetaServerRemoveResponseType = "io.nats.jetstream.api.v1.meta_server_remove_response"

/*
// JSApiMetaServerStreamInfoRequest will provide which streams are located on a particular server
type JSApiMetaServerStreamInfoRequest struct {
// Server name of the peer to be evacuated.
Server string `json:"peer"`
}
// JSApiMetaServerStreamInfoResponse is the response to a peer evacuation request in the meta group.
type JSApiMetaServerStreamInfoResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
Content map[string][]StreamConfig `json:"content,omitempty"`
}
const JSApiMetaServerStreamInfoType = "io.nats.jetstream.api.v1.meta_server_stream_info"
*/

// JSApiMetaServerStreamMoveRequest will move a stream on a server to another
// response to this will come as JSApiStreamUpdateResponse/JSApiStreamUpdateResponseType
type JSApiMetaServerStreamMoveRequest struct {
// Server name of the peer to be evacuated.
Server string `json:"peer"`
// Account name the stream to move is in
Account string `json:"account"`
// Name of stream to move
Stream string `json:"string"`
// Ephemeral placement tags for the move
Tags []string `json:"tags,omitempty"`
}

// JSApiMsgGetRequest get a message request.
type JSApiMsgGetRequest struct {
Seq uint64 `json:"seq,omitempty"`
Expand Down Expand Up @@ -1360,9 +1400,9 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
// If we are inline with client, we still may need to do a callout for stream info
// during this call, so place in Go routine to not block client.
if c.kind != ROUTER && c.kind != GATEWAY {
go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, rmsg, &cfg)
go s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, rmsg, &cfg, nil)
} else {
s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, rmsg, &cfg)
s.jsClusteredStreamUpdateRequest(ci, acc, subject, reply, rmsg, &cfg, nil)
}
return
}
Expand Down Expand Up @@ -2156,6 +2196,262 @@ func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Ac
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}

/*
// Request to have the metaleader provide which streams a server contains
func (s *Server) jsLeaderServerStreamInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
return
}
ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
if err != nil {
s.Warnf(badAPIRequestT, msg)
return
}
js, cc := s.getJetStreamCluster()
if js == nil && (cc != nil && cc.meta == nil) {
return
}
// Extra checks here but only leader is listening.
js.mu.RLock()
isLeader := cc.isLeader()
js.mu.RUnlock()
if !isLeader {
return
}
var resp = JSApiMetaServerStreamInfoResponse{ApiResponse: ApiResponse{Type: JSApiMetaServerStreamInfoType}}
if isEmptyRequest(msg) {
resp.Error = NewJSBadRequestError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
var req JSApiMetaServerStreamInfoRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if cc == nil {
// single server logic
if req.Server != s.info.Name {
resp.Error = NewJSClusterServerNotMemberError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.Content = make(map[string][]StreamConfig)
resp.Success = true
js.mu.RLock()
for aName, jsa := range js.accounts {
jsa.mu.RLock()
for _, s := range jsa.streams {
resp.Content[aName] = append(resp.Content[aName], s.cfg)
}
jsa.mu.RUnlock()
}
js.mu.RUnlock()
return
}
// clustered code
var found string
js.mu.RLock()
for _, p := range cc.meta.Peers() {
si, ok := s.nodeToInfo.Load(p.ID)
if ok && si.(nodeInfo).name == req.Server {
found = p.ID
break
}
}
js.mu.RUnlock()
if found == _EMPTY_ {
resp.Error = NewJSClusterServerNotMemberError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.Content = make(map[string][]StreamConfig)
js.mu.Lock()
for aName, a := range cc.streams {
for _, s := range a {
contains := false
for _, p := range s.Group.Peers {
if p == found {
contains = true
break
}
}
if contains {
resp.Content[aName] = append(resp.Content[aName], *s.Config)
}
}
}
js.mu.Unlock()
resp.Success = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
*/

// Request to have the metaleader move a stream on a peer to another
func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
return
}

ci, acc, _, msg, err := s.getRequestInfo(c, rmsg)
if err != nil {
s.Warnf(badAPIRequestT, msg)
return
}

js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || cc.meta == nil {
return
}

// Extra checks here but only leader is listening.
js.mu.RLock()
isLeader := cc.isLeader()
js.mu.RUnlock()

if !isLeader {
return
}

var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}

if isEmptyRequest(msg) {
resp.Error = NewJSBadRequestError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

var req JSApiMetaServerStreamMoveRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

var srcPeer string
js.mu.RLock()
for _, p := range cc.meta.Peers() {
si, ok := s.nodeToInfo.Load(p.ID)
if ok && si.(nodeInfo).name == req.Server {
srcPeer = p.ID
break
}
}
js.mu.RUnlock()

if srcPeer == _EMPTY_ {
resp.Error = NewJSClusterServerNotMemberError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

targetAcc, ok := s.accounts.Load(req.Account)
if !ok {
resp.Error = NewJSNoAccountError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

streamFound := false
cfg := StreamConfig{}
currPeers := []string{}
currCluster := _EMPTY_
js.mu.Lock()
streams, ok := cc.streams[req.Account]
if ok {
sa, ok := streams[req.Stream]
if ok {
cfg = *sa.Config
streamFound = true
currPeers = sa.Group.Peers
currCluster = sa.Group.Cluster
}
}
js.mu.Unlock()

// make sure src peer is first. Removal will drop peers from the left
for i := 0; i < len(currPeers); i++ {
if currPeers[i] == srcPeer {
currPeers[i] = currPeers[0]
currPeers[0] = srcPeer
break
}
}

if !streamFound {
resp.Error = NewJSStreamNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

// make sure client is scoped to requested account
ciNew := *(ci)
ciNew.Account = req.Account

// backup placement such that peers can be looked up with modified tag list
var origPlacement *Placement
if cfg.Placement != nil {
tmp := *cfg.Placement
origPlacement = &tmp
}

if len(req.Tags) > 0 {
if cfg.Placement == nil {
cfg.Placement = &Placement{}
}
cfg.Placement.Tags = append(cfg.Placement.Tags, req.Tags...)
}

peers := cc.selectPeerGroup(cfg.Replicas+1, currCluster, &cfg, currPeers)
if len(peers) <= cfg.Replicas {
// since expanding in the same cluster did not yield a result, try in different cluster
peers = nil

clusters := map[string]struct{}{}
s.nodeToInfo.Range(func(_, ni interface{}) bool {
if currCluster != ni.(nodeInfo).cluster {
clusters[ni.(nodeInfo).cluster] = struct{}{}
}
return true
})
for cluster := range clusters {
newPeers := cc.selectPeerGroup(cfg.Replicas, cluster, &cfg, nil)
if len(newPeers) >= cfg.Replicas {
peers = append([]string{}, currPeers...)
peers = append(peers, newPeers[:cfg.Replicas]...)
break
}
}
if peers == nil {
resp.Error = NewJSClusterNoPeersError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}

cfg.Placement = origPlacement

if c.kind != ROUTER && c.kind != GATEWAY {
go s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers)
} else {
s.jsClusteredStreamUpdateRequest(&ciNew, targetAcc.(*Account), subject, reply, rmsg, &cfg, peers)
}
}

// Request to have the meta leader stepdown.
// These will only be received the the meta leaders, so less checking needed.
func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
Expand Down

0 comments on commit 555b9c1

Please sign in to comment.