diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 2847640073..89f9ef4df7 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3066,6 +3066,11 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { lastSnap = snap } } + } else { + // If we are here we may have no state but may be processing updates as a filtered consumer. + // Make sure the store does not grow too much, so similar to memory logic above, just compact. + _, _, applied := n.Progress() + n.Compact(applied) } } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 383a15254b..8876e119a7 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10776,6 +10776,55 @@ func TestJetStreamClusterStreamConsumersCount(t *testing.T) { }) } +func TestJetStreamClusterFilteredAndIdleConsumerNRGGrowth(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + sname := "TEST" + _, err := js.AddStream(&nats.StreamConfig{Name: sname, Subjects: []string{"foo.*"}, Replicas: 3}) + require_NoError(t, err) + + sub, err := js.SubscribeSync("foo.baz", nats.Durable("dlc")) + require_NoError(t, err) + + for i := 0; i < 10_000; i++ { + js.PublishAsync("foo.bar", []byte("ok")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + checkSubsPending(t, sub, 0) + + // Grab consumer's underlying info and make sure NRG log not running away do to no-op skips on filtered consumer. + // Need a non-leader for the consumer, they are only ones getting skip ops to keep delivered updated. + cl := c.consumerLeader("$G", "TEST", "dlc") + var s *Server + for _, s = range c.servers { + if s != cl { + break + } + } + + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("dlc") + if o == nil { + t.Fatalf("Error looking up consumer %q", "dlc") + } + + // compactNumMin from monitorConsumer is 8192 atm. + const compactNumMin = 8192 + if entries, _ := o.raftNode().Size(); entries > compactNumMin { + t.Fatalf("Expected <= %d entries, got %d", compactNumMin, entries) + } +} + // Support functions // Used to setup superclusters for tests.