From dfe96944d2656e532c243693d441beb8469da5ee Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 3 Mar 2022 09:44:26 -0700 Subject: [PATCH] [FIXED] JetStream stream info consumers count in clustered mode In clustering mode, the number of consumers in stream info may be wrong in presence of non durable consumers. Ephemeral are handled by specific nodes. The StreamInfo response would contain only the consumer count that the stream leader is handling. This fix overrides the stream's state consumers count with the number of consumers from the stream assignment record. Resolves #2895 Signed-off-by: Ivan Kozlovic --- server/jetstream_api.go | 8 +++++++ server/jetstream_cluster.go | 8 +++++-- server/jetstream_cluster_test.go | 40 ++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 2 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 6dfc447165..b89e8ceb97 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1645,6 +1645,8 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s resp.ApiResponse.Type = JSApiStreamCreateResponseType } + var clusterWideConsCount int + // If we are in clustered mode we need to be the stream leader to proceed. if s.JetStreamIsClustered() { // Check to make sure the stream is assigned. @@ -1655,6 +1657,9 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s js.mu.RLock() isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName) + if sa != nil { + clusterWideConsCount = len(sa.consumers) + } js.mu.RUnlock() if isLeader && sa == nil { @@ -1730,6 +1735,9 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s Domain: s.getOpts().JetStreamDomain, Cluster: js.clusterInfo(mset.raftGroup()), } + if clusterWideConsCount > 0 { + resp.StreamInfo.State.Consumers = clusterWideConsCount + } if mset.isMirror() { resp.StreamInfo.Mirror = mset.mirrorInfo() } else if mset.hasSources() { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 37d61403a9..5e18511136 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4250,7 +4250,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt }() var missingNames []string - sent := map[string]struct{}{} + sent := map[string]int{} // Send out our requests here. for _, sa := range streams { if s.allPeersOffline(sa.Group) { @@ -4261,7 +4261,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt } else { isubj := fmt.Sprintf(clusterStreamInfoT, sa.Client.serviceAccount(), sa.Config.Name) s.sendInternalMsgLocked(isubj, inbox, nil, nil) - sent[sa.Config.Name] = struct{}{} + sent[sa.Config.Name] = len(sa.consumers) } } // Don't hold lock. @@ -4284,6 +4284,10 @@ LOOP: resp.Missing = missingNames break LOOP case si := <-rc: + consCount := sent[si.Config.Name] + if consCount > 0 { + si.State.Consumers = consCount + } delete(sent, si.Config.Name) resp.Streams = append(resp.Streams, si) // Check to see if we are done. diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index fe598dde92..383a15254b 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10736,6 +10736,46 @@ func TestJetStreamClusterInterestRetentionWithFilteredConsumersExtra(t *testing. checkState(0) } +func TestJetStreamClusterStreamConsumersCount(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + sname := "TEST_STREAM_CONS_COUNT" + _, err := js.AddStream(&nats.StreamConfig{Name: sname, Subjects: []string{"foo"}, Replicas: 3}) + require_NoError(t, err) + + // Create some R1 consumers + for i := 0; i < 10; i++ { + inbox := nats.NewInbox() + natsSubSync(t, nc, inbox) + _, err = js.AddConsumer(sname, &nats.ConsumerConfig{DeliverSubject: inbox}) + require_NoError(t, err) + } + + // Now check that the consumer count in stream info/list is 10 + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + // Check stream info + si, err := js.StreamInfo(sname) + if err != nil { + return fmt.Errorf("Error getting stream info: %v", err) + } + if n := si.State.Consumers; n != 10 { + return fmt.Errorf("From StreamInfo, expecting 10 consumers, got %v", n) + } + + // Now from stream list + for si := range js.StreamsInfo() { + if n := si.State.Consumers; n != 10 { + return fmt.Errorf("From StreamsInfo, expecting 10 consumers, got %v", n) + } + } + return nil + }) +} + // Support functions // Used to setup superclusters for tests.