From ad6020ae720fedc419edc925ece181ba06663418 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Fri, 4 Mar 2022 06:31:47 -0800 Subject: [PATCH] Fix for #2885. When a filtered consumer who has no state, meaning no messages are being processed, it still will receive updates to properly track the delivered sequence as it relates to the entire stream. Since we did not have state we were inadvertently skipping the compaction logic for the raft store. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 5 ++++ server/jetstream_cluster_test.go | 49 ++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 2847640073..89f9ef4df7 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3066,6 +3066,11 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { lastSnap = snap } } + } else { + // If we are here we may have no state but may be processing updates as a filtered consumer. + // Make sure the store does not grow too much, so similar to memory logic above, just compact. + _, _, applied := n.Progress() + n.Compact(applied) } } diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index 383a15254b..8876e119a7 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10776,6 +10776,55 @@ func TestJetStreamClusterStreamConsumersCount(t *testing.T) { }) } +func TestJetStreamClusterFilteredAndIdleConsumerNRGGrowth(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + sname := "TEST" + _, err := js.AddStream(&nats.StreamConfig{Name: sname, Subjects: []string{"foo.*"}, Replicas: 3}) + require_NoError(t, err) + + sub, err := js.SubscribeSync("foo.baz", nats.Durable("dlc")) + require_NoError(t, err) + + for i := 0; i < 10_000; i++ { + js.PublishAsync("foo.bar", []byte("ok")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + checkSubsPending(t, sub, 0) + + // Grab consumer's underlying info and make sure NRG log not running away do to no-op skips on filtered consumer. + // Need a non-leader for the consumer, they are only ones getting skip ops to keep delivered updated. + cl := c.consumerLeader("$G", "TEST", "dlc") + var s *Server + for _, s = range c.servers { + if s != cl { + break + } + } + + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("dlc") + if o == nil { + t.Fatalf("Error looking up consumer %q", "dlc") + } + + // compactNumMin from monitorConsumer is 8192 atm. + const compactNumMin = 8192 + if entries, _ := o.raftNode().Size(); entries > compactNumMin { + t.Fatalf("Expected <= %d entries, got %d", compactNumMin, entries) + } +} + // Support functions // Used to setup superclusters for tests.