Skip to content

Commit

Permalink
Fix for #2885.
Browse files Browse the repository at this point in the history
When a filtered consumer who has no state, meaning no messages are being processed, it still will receive updates to properly track the delivered sequence as it relates to the entire stream.
Since we did not have state we were inadvertently skipping the compaction logic for the raft store.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Mar 4, 2022
1 parent 30009fd commit ad6020a
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
5 changes: 5 additions & 0 deletions server/jetstream_cluster.go
Expand Up @@ -3066,6 +3066,11 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
lastSnap = snap
}
}
} else {
// If we are here we may have no state but may be processing updates as a filtered consumer.
// Make sure the store does not grow too much, so similar to memory logic above, just compact.
_, _, applied := n.Progress()
n.Compact(applied)
}
}

Expand Down
49 changes: 49 additions & 0 deletions server/jetstream_cluster_test.go
Expand Up @@ -10776,6 +10776,55 @@ func TestJetStreamClusterStreamConsumersCount(t *testing.T) {
})
}

func TestJetStreamClusterFilteredAndIdleConsumerNRGGrowth(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

sname := "TEST"
_, err := js.AddStream(&nats.StreamConfig{Name: sname, Subjects: []string{"foo.*"}, Replicas: 3})
require_NoError(t, err)

sub, err := js.SubscribeSync("foo.baz", nats.Durable("dlc"))
require_NoError(t, err)

for i := 0; i < 10_000; i++ {
js.PublishAsync("foo.bar", []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

checkSubsPending(t, sub, 0)

// Grab consumer's underlying info and make sure NRG log not running away do to no-op skips on filtered consumer.
// Need a non-leader for the consumer, they are only ones getting skip ops to keep delivered updated.
cl := c.consumerLeader("$G", "TEST", "dlc")
var s *Server
for _, s = range c.servers {
if s != cl {
break
}
}

mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("dlc")
if o == nil {
t.Fatalf("Error looking up consumer %q", "dlc")
}

// compactNumMin from monitorConsumer is 8192 atm.
const compactNumMin = 8192
if entries, _ := o.raftNode().Size(); entries > compactNumMin {
t.Fatalf("Expected <= %d entries, got %d", compactNumMin, entries)
}
}

// Support functions

// Used to setup superclusters for tests.
Expand Down

0 comments on commit ad6020a

Please sign in to comment.