Skip to content

Commit

Permalink
Merge pull request #2905 from nats-io/r_update_improvements
Browse files Browse the repository at this point in the history
Made improvements to scale up and down for streams and consumers.
  • Loading branch information
derekcollison committed Mar 7, 2022
2 parents 4e5150e + 58da4b9 commit 24067d7
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 7 deletions.
10 changes: 10 additions & 0 deletions server/consumer.go
Expand Up @@ -749,6 +749,16 @@ func (o *consumer) checkQueueInterest() {
}
}

// clears our node if we have one. When we scale down to 1.
func (o *consumer) clearNode() {
o.mu.Lock()
defer o.mu.Unlock()
if o.node != nil {
o.node.Delete()
o.node = nil
}
}

// Lock should be held.
func (o *consumer) isLeader() bool {
if o.node != nil {
Expand Down
67 changes: 61 additions & 6 deletions server/jetstream_cluster.go
Expand Up @@ -1159,10 +1159,10 @@ func (js *jetStream) processAddPeer(peer string) {
cc.meta.Propose(encodeAddStreamAssignment(csa))
for _, ca := range sa.consumers {
// Ephemerals are R=1, so only auto-remap durables, or R>1.
if ca.Config.Durable != _EMPTY_ {
cca := *ca
if ca.Config.Durable != _EMPTY_ || len(ca.Group.Peers) > 1 {
cca := ca.copyGroup()
cca.Group.Peers = csa.Group.Peers
cc.meta.Propose(encodeAddConsumerAssignment(&cca))
cc.meta.Propose(encodeAddConsumerAssignment(cca))
}
}
}
Expand Down Expand Up @@ -2313,6 +2313,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss

mset, err := acc.lookupStream(sa.Config.Name)
if err == nil && mset != nil {
var needsSetLeader bool
if !alreadyRunning && numReplicas > 1 {
if needsNode {
js.createRaftGroup(rg, storage)
Expand All @@ -2321,6 +2322,10 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
} else if numReplicas == 1 && alreadyRunning {
// We downgraded to R1. Make sure we cleanup the raft node and the stream monitor.
mset.removeNode()
// Make sure we are leader now that we are R1.
needsSetLeader = true
// In case we nned to shutdown the cluster specific subs, etc.
mset.setLeader(false)
js.mu.Lock()
sa.Group.node = nil
js.mu.Unlock()
Expand All @@ -2330,6 +2335,10 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err)
mset.setStreamAssignment(osa)
}
// Make sure we are the leader now that we are R1.
if needsSetLeader {
mset.setLeader(true)
}
}

// If not found we must be expanding into this node since if we are here we know we are a member.
Expand Down Expand Up @@ -2892,7 +2901,19 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
} else {
if didCreate {
o.setCreatedTime(ca.Created)
} else {
// Check for scale down to 1..
if rg.node != nil && len(rg.Peers) == 1 {
o.clearNode()
o.setLeader(true)
// Need to clear from rg too.
js.mu.Lock()
rg.node = nil
js.mu.Unlock()
return
}
}

// Start our monitoring routine.
if rg.node == nil {
// Single replica consumer, process manually here.
Expand Down Expand Up @@ -4027,6 +4048,8 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su

// Check for replica changes.
rg := osa.Group
var consumers []*consumerAssignment

if newCfg.Replicas != len(rg.Peers) {
// We are adding new peers here.
if newCfg.Replicas > len(rg.Peers) {
Expand All @@ -4037,18 +4060,45 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
return
}
// Single nodes are not recorded by the NRG layer so we can rename.
if len(rg.Peers) == 1 {
if len(peers) == 1 {
rg.Name = groupNameForStream(peers, rg.Storage)
} else if len(rg.Peers) == 1 {
// This is scale up from being a singelton, set preferred to that singelton.
rg.Preferred = rg.Peers[0]
}
rg.Peers = peers
} else {
// We are deleting nodes here.
rg.Peers = rg.Peers[:newCfg.Replicas]
}

// Need to remap any consumers.
for _, ca := range osa.consumers {
// Ephemerals are R=1, so only auto-remap durables, or R>1.
numPeers := len(ca.Group.Peers)
if ca.Config.Durable != _EMPTY_ || numPeers > 1 {
cca := ca.copyGroup()
// Adjust preferred as needed.
if numPeers == 1 && len(rg.Peers) > 1 {
cca.Group.Preferred = ca.Group.Peers[0]
} else {
cca.Group.Preferred = _EMPTY_
}
// Assign new peers.
cca.Group.Peers = rg.Peers
// We can not propose here before the stream itself so we collect them.
consumers = append(consumers, cca)
}
}
}

sa := &streamAssignment{Group: rg, Sync: osa.Sync, Config: newCfg, Subject: subject, Reply: reply, Client: ci}
cc.meta.Propose(encodeUpdateStreamAssignment(sa))

// Process any staged consumers.
for _, ca := range consumers {
cc.meta.Propose(encodeAddConsumerAssignment(ca))
}
}

func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, acc *Account, stream, subject, reply string, rmsg []byte) {
Expand Down Expand Up @@ -4917,10 +4967,15 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.mu.RLock()
canRespond := !mset.cfg.NoAck && len(reply) > 0
name, stype := mset.cfg.Name, mset.cfg.Storage
s, js, jsa, st, rf, outq := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.outq
s, js, jsa, st, rf, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.outq, mset.node
maxMsgSize, lseq := int(mset.cfg.MaxMsgSize), mset.lseq
mset.mu.RUnlock()

// This should not happen but possible now that we allow scale up, and scale down where this could trigger.
if node == nil {
return mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0)
}

// Check here pre-emptively if we have exceeded this server limits.
if js.limitsExceeded(stype) {
s.resourcesExeededError()
Expand Down Expand Up @@ -5004,7 +5059,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.clseq++

// Do proposal.
err := mset.node.Propose(esm)
err := node.Propose(esm)
if err != nil && mset.clseq > 0 {
mset.clseq--
}
Expand Down
93 changes: 93 additions & 0 deletions server/jetstream_cluster_test.go
Expand Up @@ -10587,6 +10587,99 @@ func TestJetStreamClusterStreamReplicaUpdates(t *testing.T) {
updateReplicas(1)
}

func TestJetStreamClusterStreamReplicaUpdateFunctionCheck(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Start out at R3
cfg := &nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
}
_, err := js.AddStream(cfg)
require_NoError(t, err)

sub, err := js.SubscribeSync("foo", nats.Durable("cat"))
require_NoError(t, err)

numMsgs := 10
for i := 0; i < numMsgs; i++ {
_, err := js.Publish("foo", []byte("HELLO WORLD"))
require_NoError(t, err)
}
checkSubsPending(t, sub, numMsgs)

// Now ask leader to stepdown.
rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second)
require_NoError(t, err)

var sdResp JSApiStreamLeaderStepDownResponse
err = json.Unmarshal(rmsg.Data, &sdResp)
require_NoError(t, err)

if sdResp.Error != nil || !sdResp.Success {
t.Fatalf("Unexpected error: %+v", sdResp.Error)
}

c.waitOnStreamLeader("$G", "TEST")

updateReplicas := func(r int) {
t.Helper()
cfg.Replicas = r
_, err := js.UpdateStream(cfg)
require_NoError(t, err)
c.waitOnStreamLeader("$G", "TEST")
c.waitOnConsumerLeader("$G", "TEST", "cat")
ci, err := js.ConsumerInfo("TEST", "cat")
require_NoError(t, err)
if ci.Cluster.Leader == _EMPTY_ {
t.Fatalf("Expected a consumer leader but got none in consumer info")
}
if len(ci.Cluster.Replicas)+1 != r {
t.Fatalf("Expected consumer info to have %d peers, got %d", r, len(ci.Cluster.Replicas)+1)
}
}

// Scale down to 1.
updateReplicas(1)

// Make sure we can still send to the stream.
for i := 0; i < numMsgs; i++ {
_, err := js.Publish("foo", []byte("HELLO WORLD"))
require_NoError(t, err)
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs != uint64(2*numMsgs) {
t.Fatalf("Expected %d msgs, got %d", 3*numMsgs, si.State.Msgs)
}

checkSubsPending(t, sub, 2*numMsgs)

// Now back up.
updateReplicas(3)

// Send more.
for i := 0; i < numMsgs; i++ {
_, err := js.Publish("foo", []byte("HELLO WORLD"))
require_NoError(t, err)
}

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
if si.State.Msgs != uint64(3*numMsgs) {
t.Fatalf("Expected %d msgs, got %d", 3*numMsgs, si.State.Msgs)
}

checkSubsPending(t, sub, 3*numMsgs)
}

func TestJetStreamClusterStreamTagPlacement(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 4)
defer sc.shutdown()
Expand Down
6 changes: 5 additions & 1 deletion server/stream.go
Expand Up @@ -3185,7 +3185,6 @@ func (mset *stream) internalLoop() {
c.registerWithAccount(mset.acc)
defer c.closeConnection(ClientClosed)
outq, qch, msgs := mset.outq, mset.qch, mset.msgs
isClustered := mset.cfg.Replicas > 1

// For the ack msgs queue for interest retention.
var (
Expand Down Expand Up @@ -3241,6 +3240,11 @@ func (mset *stream) internalLoop() {
c.flushClients(0)
outq.recycle(&pms)
case <-msgs.ch:
// This can possibly change now so needs to be checked here.
mset.mu.RLock()
isClustered := mset.node != nil
mset.mu.RUnlock()

ims := msgs.pop()
for _, imi := range ims {
im := imi.(*inMsg)
Expand Down

0 comments on commit 24067d7

Please sign in to comment.