Skip to content

Commit

Permalink
Merge pull request #2904 from nats-io/peer_remove_bad_consumer_state
Browse files Browse the repository at this point in the history
[FIXED] Inconsistent durable consumer state after stream peer removal
  • Loading branch information
derekcollison committed Mar 6, 2022
2 parents 54da758 + 31a1972 commit eb1ed55
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 16 deletions.
16 changes: 11 additions & 5 deletions server/consumer.go
Expand Up @@ -868,12 +868,11 @@ func (o *consumer) setLeader(isLeader bool) {
} else {
// Shutdown the go routines and the subscriptions.
o.mu.Lock()
// ok if they are nil, we protect inside unsubscribe()
o.unsubscribe(o.ackSub)
o.unsubscribe(o.reqSub)
o.unsubscribe(o.fcSub)
o.ackSub = nil
o.reqSub = nil
o.fcSub = nil
o.ackSub, o.reqSub, o.fcSub = nil, nil, nil
if o.infoSub != nil {
o.srv.sysUnsubscribe(o.infoSub)
o.infoSub = nil
Expand Down Expand Up @@ -3528,8 +3527,15 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
}
o.closed = true

if dflag && advisory && o.isLeader() {
o.sendDeleteAdvisoryLocked()
// Check if we are the leader and are being deleted.
if dflag && o.isLeader() {
// If we are clustered and node leader (probable from above), stepdown.
if node := o.node; node != nil && node.Leader() {
node.StepDown()
}
if advisory {
o.sendDeleteAdvisoryLocked()
}
}

if o.qch != nil {
Expand Down
37 changes: 29 additions & 8 deletions server/jetstream_cluster.go
Expand Up @@ -1102,7 +1102,7 @@ func (js *jetStream) setConsumerAssignmentRecovering(ca *consumerAssignment) {
}
}

// Just copied over and changes out the group so it can be encoded.
// Just copies over and changes out the group so it can be encoded.
// Lock should be held.
func (sa *streamAssignment) copyGroup() *streamAssignment {
csa, cg := *sa, *sa.Group
Expand All @@ -1111,6 +1111,15 @@ func (sa *streamAssignment) copyGroup() *streamAssignment {
return &csa
}

// Just copies over and changes out the group so it can be encoded.
// Lock should be held.
func (ca *consumerAssignment) copyGroup() *consumerAssignment {
cca, cg := *ca, *ca.Group
cca.Group = &cg
cca.Group.Peers = copyStrings(ca.Group.Peers)
return &cca
}

// Lock should be held.
func (sa *streamAssignment) missingPeers() bool {
return len(sa.Group.Peers) < sa.Config.Replicas
Expand Down Expand Up @@ -1235,9 +1244,9 @@ func (js *jetStream) removePeerFromStreamLocked(sa *streamAssignment, peer strin
for _, ca := range sa.consumers {
// Ephemerals are R=1, so only auto-remap durables, or R>1.
if ca.Config.Durable != _EMPTY_ {
cca := *ca
cca.Group.Peers = rg.Peers
cc.meta.Propose(encodeAddConsumerAssignment(&cca))
cca := ca.copyGroup()
cca.Group.Peers, cca.Group.Preferred = rg.Peers, _EMPTY_
cc.meta.Propose(encodeAddConsumerAssignment(cca))
} else if ca.Group.isMember(peer) {
// These are ephemerals. Check to see if we deleted this peer.
cc.meta.Propose(encodeDeleteConsumerAssignment(ca))
Expand Down Expand Up @@ -1297,7 +1306,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
case assignConsumerOp:
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode consumer assigment: %q", buf[1:])
js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:])
return didSnap, didRemove, err
}
if isRecovering {
Expand All @@ -1307,7 +1316,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
case assignCompressedConsumerOp:
ca, err := decodeConsumerAssignmentCompressed(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode compressed consumer assigment: %q", buf[1:])
js.srv.Errorf("JetStream cluster failed to decode compressed consumer assignment: %q", buf[1:])
return didSnap, didRemove, err
}
if isRecovering {
Expand All @@ -1317,7 +1326,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, isRecovering bool) (bool
case removeConsumerOp:
ca, err := decodeConsumerAssignment(buf[1:])
if err != nil {
js.srv.Errorf("JetStream cluster failed to decode consumer assigment: %q", buf[1:])
js.srv.Errorf("JetStream cluster failed to decode consumer assignment: %q", buf[1:])
return didSnap, didRemove, err
}
if isRecovering {
Expand Down Expand Up @@ -2194,6 +2203,9 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool {
// Check if we have a raft node running, meaning we are no longer part of the group but were.
js.mu.Lock()
if node := sa.Group.node; node != nil {
if node.Leader() {
node.StepDown()
}
node.ProposeRemovePeer(ourID)
didRemove = true
}
Expand Down Expand Up @@ -2687,6 +2699,10 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
sa.consumers = make(map[string]*consumerAssignment)
} else if oca := sa.consumers[ca.Name]; oca != nil {
wasExisting = true
// Copy over private existing state from former SA.
ca.Group.node = oca.Group.node
ca.responded = oca.responded
ca.err = oca.err
}

// Capture the optional state. We will pass it along if we are a member to apply.
Expand All @@ -2706,6 +2722,9 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
// Check if we have a raft node running, meaning we are no longer part of the group but were.
js.mu.Lock()
if node := ca.Group.node; node != nil {
if node.Leader() {
node.StepDown()
}
node.ProposeRemovePeer(ourID)
}
ca.Group.node = nil
Expand Down Expand Up @@ -3076,6 +3095,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {

// Track if we are leader.
var isLeader bool
recovering := true

for {
select {
Expand All @@ -3088,6 +3108,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
for _, cei := range ces {
// No special processing needed for when we are caught up on restart.
if cei == nil {
recovering = false
if n.NeedSnapshot() {
doSnapshot()
}
Expand All @@ -3106,7 +3127,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
}
aq.recycle(&ces)
case isLeader = <-lch:
if !isLeader && n.GroupLeader() != noLeader {
if recovering && !isLeader {
js.setConsumerAssignmentRecovering(ca)
}
js.processConsumerLeaderChange(o, isLeader)
Expand Down
8 changes: 5 additions & 3 deletions server/jetstream_cluster_test.go
Expand Up @@ -4110,11 +4110,10 @@ func TestJetStreamClusterStreamRemovePeer(t *testing.T) {
for _, p := range si.Cluster.Replicas {
peers = append(peers, p.Name)
}
// Pick a truly random server to remove.
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
toRemove := peers[0]
if cl := c.leader(); toRemove == cl.Name() {
toRemove = peers[1]
}

// First test bad peer.
req := &JSApiStreamRemovePeerRequest{Peer: "NOT VALID"}
jsreq, err := json.Marshal(req)
Expand All @@ -4140,6 +4139,7 @@ func TestJetStreamClusterStreamRemovePeer(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

resp, err = nc.Request(fmt.Sprintf(JSApiStreamRemovePeerT, "TEST"), jsreq, time.Second)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
Expand All @@ -4151,6 +4151,8 @@ func TestJetStreamClusterStreamRemovePeer(t *testing.T) {
t.Fatalf("Unexpected error: %+v", rpResp.Error)
}

c.waitOnStreamLeader("$G", "TEST")

checkFor(t, 10*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("TEST", nats.MaxWait(time.Second))
if err != nil {
Expand Down

0 comments on commit eb1ed55

Please sign in to comment.