diff --git a/server/consumer.go b/server/consumer.go index 00515fc168..1b39ab6a48 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -868,12 +868,11 @@ func (o *consumer) setLeader(isLeader bool) { } else { // Shutdown the go routines and the subscriptions. o.mu.Lock() + // ok if they are nil, we protect inside unsubscribe() o.unsubscribe(o.ackSub) o.unsubscribe(o.reqSub) o.unsubscribe(o.fcSub) - o.ackSub = nil - o.reqSub = nil - o.fcSub = nil + o.ackSub, o.reqSub, o.fcSub = nil, nil, nil if o.infoSub != nil { o.srv.sysUnsubscribe(o.infoSub) o.infoSub = nil @@ -3528,8 +3527,15 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { } o.closed = true - if dflag && advisory && o.isLeader() { - o.sendDeleteAdvisoryLocked() + // Check if we are the leader and are being deleted. + if dflag && o.isLeader() { + // If we are clustered and node leader (probable from above), stepdown. + if node := o.node; node != nil && node.Leader() { + node.StepDown() + } + if advisory { + o.sendDeleteAdvisoryLocked() + } } if o.qch != nil { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 89f9ef4df7..badd17621d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1102,7 +1102,7 @@ func (js *jetStream) setConsumerAssignmentRecovering(ca *consumerAssignment) { } } -// Just copied over and changes out the group so it can be encoded. +// Just copies over and changes out the group so it can be encoded. // Lock should be held. func (sa *streamAssignment) copyGroup() *streamAssignment { csa, cg := *sa, *sa.Group @@ -1111,6 +1111,15 @@ func (sa *streamAssignment) copyGroup() *streamAssignment { return &csa } +// Just copies over and changes out the group so it can be encoded. +// Lock should be held. +func (ca *consumerAssignment) copyGroup() *consumerAssignment { + cca, cg := *ca, *ca.Group + cca.Group = &cg + cca.Group.Peers = copyStrings(ca.Group.Peers) + return &cca +} + // Lock should be held. func (sa *streamAssignment) missingPeers() bool { return len(sa.Group.Peers) < sa.Config.Replicas @@ -1235,9 +1244,9 @@ func (js *jetStream) removePeerFromStreamLocked(sa *streamAssignment, peer strin for _, ca := range sa.consumers { // Ephemerals are R=1, so only auto-remap durables, or R>1. if ca.Config.Durable != _EMPTY_ { - cca := *ca - cca.Group.Peers = rg.Peers - cc.meta.Propose(encodeAddConsumerAssignment(&cca)) + cca := ca.copyGroup() + cca.Group.Peers, cca.Group.Preferred = rg.Peers, _EMPTY_ + cc.meta.Propose(encodeAddConsumerAssignment(cca)) } else if ca.Group.isMember(peer) { // These are ephemerals. Check to see if we deleted this peer. cc.meta.Propose(encodeDeleteConsumerAssignment(ca)) @@ -1297,7 +1306,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool case assignConsumerOp: ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { - js.srv.Errorf("JetStream cluster failed to decode consumer assigment: %q", buf[1:]) + js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:]) return didSnap, didRemove, err } if isRecovering { @@ -1307,7 +1316,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool case assignCompressedConsumerOp: ca, err := decodeConsumerAssignmentCompressed(buf[1:]) if err != nil { - js.srv.Errorf("JetStream cluster failed to decode compressed consumer assigment: %q", buf[1:]) + js.srv.Errorf("JetStream cluster failed to decode compressed consumer assignment: %q", buf[1:]) return didSnap, didRemove, err } if isRecovering { @@ -1317,7 +1326,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool case removeConsumerOp: ca, err := decodeConsumerAssignment(buf[1:]) if err != nil { - js.srv.Errorf("JetStream cluster failed to decode consumer assigment: %q", buf[1:]) + js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:]) return didSnap, didRemove, err } if isRecovering { @@ -2194,6 +2203,9 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool { // Check if we have a raft node running, meaning we are no longer part of the group but were. js.mu.Lock() if node := sa.Group.node; node != nil { + if node.Leader() { + node.StepDown() + } node.ProposeRemovePeer(ourID) didRemove = true } @@ -2687,6 +2699,10 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { sa.consumers = make(map[string]*consumerAssignment) } else if oca := sa.consumers[ca.Name]; oca != nil { wasExisting = true + // Copy over private existing state from former SA. + ca.Group.node = oca.Group.node + ca.responded = oca.responded + ca.err = oca.err } // Capture the optional state. We will pass it along if we are a member to apply. @@ -2706,6 +2722,9 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Check if we have a raft node running, meaning we are no longer part of the group but were. js.mu.Lock() if node := ca.Group.node; node != nil { + if node.Leader() { + node.StepDown() + } node.ProposeRemovePeer(ourID) } ca.Group.node = nil @@ -3076,6 +3095,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { // Track if we are leader. var isLeader bool + recovering := true for { select { @@ -3088,6 +3108,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { for _, cei := range ces { // No special processing needed for when we are caught up on restart. if cei == nil { + recovering = false if n.NeedSnapshot() { doSnapshot() } @@ -3106,7 +3127,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { } aq.recycle(&ces) case isLeader = <-lch: - if !isLeader && n.GroupLeader() != noLeader { + if recovering && !isLeader { js.setConsumerAssignmentRecovering(ca) } js.processConsumerLeaderChange(o, isLeader) diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 5de969c78e..9d36d8237a 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -4110,11 +4110,10 @@ func TestJetStreamClusterStreamRemovePeer(t *testing.T) { for _, p := range si.Cluster.Replicas { peers = append(peers, p.Name) } + // Pick a truly random server to remove. rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) toRemove := peers[0] - if cl := c.leader(); toRemove == cl.Name() { - toRemove = peers[1] - } + // First test bad peer. req := &JSApiStreamRemovePeerRequest{Peer: "NOT VALID"} jsreq, err := json.Marshal(req) @@ -4140,6 +4139,7 @@ func TestJetStreamClusterStreamRemovePeer(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + resp, err = nc.Request(fmt.Sprintf(JSApiStreamRemovePeerT, "TEST"), jsreq, time.Second) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -4151,6 +4151,8 @@ func TestJetStreamClusterStreamRemovePeer(t *testing.T) { t.Fatalf("Unexpected error: %+v", rpResp.Error) } + c.waitOnStreamLeader("$G", "TEST") + checkFor(t, 10*time.Second, 100*time.Millisecond, func() error { si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second)) if err != nil {