From 58da4b917a9b9c7d365730b588f62aea21eea022 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 6 Mar 2022 16:22:45 -0800 Subject: [PATCH] Made improvements to scale up and down for streams and consumers. Signed-off-by: Derek Collison --- server/consumer.go | 10 ++++ server/jetstream_cluster.go | 67 ++++++++++++++++++++--- server/jetstream_cluster_test.go | 93 ++++++++++++++++++++++++++++++++ server/stream.go | 6 ++- 4 files changed, 169 insertions(+), 7 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 1b39ab6a48..b3b99eb092 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -749,6 +749,16 @@ func (o *consumer) checkQueueInterest() { } } +// clears our node if we have one. When we scale down to 1. +func (o *consumer) clearNode() { + o.mu.Lock() + defer o.mu.Unlock() + if o.node != nil { + o.node.Delete() + o.node = nil + } +} + // Lock should be held. func (o *consumer) isLeader() bool { if o.node != nil { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index badd17621d..a6dc64170e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1159,10 +1159,10 @@ func (js *jetStream) processAddPeer(peer string) { cc.meta.Propose(encodeAddStreamAssignment(csa)) for _, ca := range sa.consumers { // Ephemerals are R=1, so only auto-remap durables, or R>1. - if ca.Config.Durable != _EMPTY_ { - cca := *ca + if ca.Config.Durable != _EMPTY_ || len(ca.Group.Peers) > 1 { + cca := ca.copyGroup() cca.Group.Peers = csa.Group.Peers - cc.meta.Propose(encodeAddConsumerAssignment(&cca)) + cc.meta.Propose(encodeAddConsumerAssignment(cca)) } } } @@ -2313,6 +2313,7 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss mset, err := acc.lookupStream(sa.Config.Name) if err == nil && mset != nil { + var needsSetLeader bool if !alreadyRunning && numReplicas > 1 { if needsNode { js.createRaftGroup(rg, storage) @@ -2321,6 +2322,10 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss } else if numReplicas == 1 && alreadyRunning { // We downgraded to R1. Make sure we cleanup the raft node and the stream monitor. mset.removeNode() + // Make sure we are leader now that we are R1. + needsSetLeader = true + // In case we nned to shutdown the cluster specific subs, etc. + mset.setLeader(false) js.mu.Lock() sa.Group.node = nil js.mu.Unlock() @@ -2330,6 +2335,10 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err) mset.setStreamAssignment(osa) } + // Make sure we are the leader now that we are R1. + if needsSetLeader { + mset.setLeader(true) + } } // If not found we must be expanding into this node since if we are here we know we are a member. @@ -2892,7 +2901,19 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state } else { if didCreate { o.setCreatedTime(ca.Created) + } else { + // Check for scale down to 1.. + if rg.node != nil && len(rg.Peers) == 1 { + o.clearNode() + o.setLeader(true) + // Need to clear from rg too. + js.mu.Lock() + rg.node = nil + js.mu.Unlock() + return + } } + // Start our monitoring routine. if rg.node == nil { // Single replica consumer, process manually here. @@ -4027,6 +4048,8 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su // Check for replica changes. rg := osa.Group + var consumers []*consumerAssignment + if newCfg.Replicas != len(rg.Peers) { // We are adding new peers here. if newCfg.Replicas > len(rg.Peers) { @@ -4037,18 +4060,45 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su return } // Single nodes are not recorded by the NRG layer so we can rename. - if len(rg.Peers) == 1 { + if len(peers) == 1 { rg.Name = groupNameForStream(peers, rg.Storage) + } else if len(rg.Peers) == 1 { + // This is scale up from being a singelton, set preferred to that singelton. + rg.Preferred = rg.Peers[0] } rg.Peers = peers } else { // We are deleting nodes here. rg.Peers = rg.Peers[:newCfg.Replicas] } + + // Need to remap any consumers. + for _, ca := range osa.consumers { + // Ephemerals are R=1, so only auto-remap durables, or R>1. + numPeers := len(ca.Group.Peers) + if ca.Config.Durable != _EMPTY_ || numPeers > 1 { + cca := ca.copyGroup() + // Adjust preferred as needed. + if numPeers == 1 && len(rg.Peers) > 1 { + cca.Group.Preferred = ca.Group.Peers[0] + } else { + cca.Group.Preferred = _EMPTY_ + } + // Assign new peers. + cca.Group.Peers = rg.Peers + // We can not propose here before the stream itself so we collect them. + consumers = append(consumers, cca) + } + } } sa := &streamAssignment{Group: rg, Sync: osa.Sync, Config: newCfg, Subject: subject, Reply: reply, Client: ci} cc.meta.Propose(encodeUpdateStreamAssignment(sa)) + + // Process any staged consumers. + for _, ca := range consumers { + cc.meta.Propose(encodeAddConsumerAssignment(ca)) + } } func (s *Server) jsClusteredStreamDeleteRequest(ci *ClientInfo, acc *Account, stream, subject, reply string, rmsg []byte) { @@ -4917,10 +4967,15 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ mset.mu.RLock() canRespond := !mset.cfg.NoAck && len(reply) > 0 name, stype := mset.cfg.Name, mset.cfg.Storage - s, js, jsa, st, rf, outq := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.outq + s, js, jsa, st, rf, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.outq, mset.node maxMsgSize, lseq := int(mset.cfg.MaxMsgSize), mset.lseq mset.mu.RUnlock() + // This should not happen but possible now that we allow scale up, and scale down where this could trigger. + if node == nil { + return mset.processJetStreamMsg(subject, reply, hdr, msg, 0, 0) + } + // Check here pre-emptively if we have exceeded this server limits. if js.limitsExceeded(stype) { s.resourcesExeededError() @@ -5004,7 +5059,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ mset.clseq++ // Do proposal. - err := mset.node.Propose(esm) + err := node.Propose(esm) if err != nil && mset.clseq > 0 { mset.clseq-- } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 9d36d8237a..192da6261a 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10587,6 +10587,99 @@ func TestJetStreamClusterStreamReplicaUpdates(t *testing.T) { updateReplicas(1) } +func TestJetStreamClusterStreamReplicaUpdateFunctionCheck(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Start out at R3 + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + } + _, err := js.AddStream(cfg) + require_NoError(t, err) + + sub, err := js.SubscribeSync("foo", nats.Durable("cat")) + require_NoError(t, err) + + numMsgs := 10 + for i := 0; i < numMsgs; i++ { + _, err := js.Publish("foo", []byte("HELLO WORLD")) + require_NoError(t, err) + } + checkSubsPending(t, sub, numMsgs) + + // Now ask leader to stepdown. + rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, time.Second) + require_NoError(t, err) + + var sdResp JSApiStreamLeaderStepDownResponse + err = json.Unmarshal(rmsg.Data, &sdResp) + require_NoError(t, err) + + if sdResp.Error != nil || !sdResp.Success { + t.Fatalf("Unexpected error: %+v", sdResp.Error) + } + + c.waitOnStreamLeader("$G", "TEST") + + updateReplicas := func(r int) { + t.Helper() + cfg.Replicas = r + _, err := js.UpdateStream(cfg) + require_NoError(t, err) + c.waitOnStreamLeader("$G", "TEST") + c.waitOnConsumerLeader("$G", "TEST", "cat") + ci, err := js.ConsumerInfo("TEST", "cat") + require_NoError(t, err) + if ci.Cluster.Leader == _EMPTY_ { + t.Fatalf("Expected a consumer leader but got none in consumer info") + } + if len(ci.Cluster.Replicas)+1 != r { + t.Fatalf("Expected consumer info to have %d peers, got %d", r, len(ci.Cluster.Replicas)+1) + } + } + + // Scale down to 1. + updateReplicas(1) + + // Make sure we can still send to the stream. + for i := 0; i < numMsgs; i++ { + _, err := js.Publish("foo", []byte("HELLO WORLD")) + require_NoError(t, err) + } + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + if si.State.Msgs != uint64(2*numMsgs) { + t.Fatalf("Expected %d msgs, got %d", 3*numMsgs, si.State.Msgs) + } + + checkSubsPending(t, sub, 2*numMsgs) + + // Now back up. + updateReplicas(3) + + // Send more. + for i := 0; i < numMsgs; i++ { + _, err := js.Publish("foo", []byte("HELLO WORLD")) + require_NoError(t, err) + } + + si, err = js.StreamInfo("TEST") + require_NoError(t, err) + if si.State.Msgs != uint64(3*numMsgs) { + t.Fatalf("Expected %d msgs, got %d", 3*numMsgs, si.State.Msgs) + } + + checkSubsPending(t, sub, 3*numMsgs) +} + func TestJetStreamClusterStreamTagPlacement(t *testing.T) { sc := createJetStreamSuperCluster(t, 3, 4) defer sc.shutdown() diff --git a/server/stream.go b/server/stream.go index 41ff5444b7..c5cd88a9d0 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3185,7 +3185,6 @@ func (mset *stream) internalLoop() { c.registerWithAccount(mset.acc) defer c.closeConnection(ClientClosed) outq, qch, msgs := mset.outq, mset.qch, mset.msgs - isClustered := mset.cfg.Replicas > 1 // For the ack msgs queue for interest retention. var ( @@ -3241,6 +3240,11 @@ func (mset *stream) internalLoop() { c.flushClients(0) outq.recycle(&pms) case <-msgs.ch: + // This can possibly change now so needs to be checked here. + mset.mu.RLock() + isClustered := mset.node != nil + mset.mu.RUnlock() + ims := msgs.pop() for _, imi := range ims { im := imi.(*inMsg)