diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 6dfc447165..b89e8ceb97 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1645,6 +1645,8 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s resp.ApiResponse.Type = JSApiStreamCreateResponseType } + var clusterWideConsCount int + // If we are in clustered mode we need to be the stream leader to proceed. if s.JetStreamIsClustered() { // Check to make sure the stream is assigned. @@ -1655,6 +1657,9 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s js.mu.RLock() isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName) + if sa != nil { + clusterWideConsCount = len(sa.consumers) + } js.mu.RUnlock() if isLeader && sa == nil { @@ -1730,6 +1735,9 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s Domain: s.getOpts().JetStreamDomain, Cluster: js.clusterInfo(mset.raftGroup()), } + if clusterWideConsCount > 0 { + resp.StreamInfo.State.Consumers = clusterWideConsCount + } if mset.isMirror() { resp.StreamInfo.Mirror = mset.mirrorInfo() } else if mset.hasSources() { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 37d61403a9..5e18511136 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4250,7 +4250,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt }() var missingNames []string - sent := map[string]struct{}{} + sent := map[string]int{} // Send out our requests here. for _, sa := range streams { if s.allPeersOffline(sa.Group) { @@ -4261,7 +4261,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt } else { isubj := fmt.Sprintf(clusterStreamInfoT, sa.Client.serviceAccount(), sa.Config.Name) s.sendInternalMsgLocked(isubj, inbox, nil, nil) - sent[sa.Config.Name] = struct{}{} + sent[sa.Config.Name] = len(sa.consumers) } } // Don't hold lock. @@ -4284,6 +4284,10 @@ LOOP: resp.Missing = missingNames break LOOP case si := <-rc: + consCount := sent[si.Config.Name] + if consCount > 0 { + si.State.Consumers = consCount + } delete(sent, si.Config.Name) resp.Streams = append(resp.Streams, si) // Check to see if we are done. diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go index fe598dde92..383a15254b 100644 --- a/server/jetstream_cluster_test.go +++ b/server/jetstream_cluster_test.go @@ -10736,6 +10736,46 @@ func TestJetStreamClusterInterestRetentionWithFilteredConsumersExtra(t *testing. checkState(0) } +func TestJetStreamClusterStreamConsumersCount(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + sname := "TEST_STREAM_CONS_COUNT" + _, err := js.AddStream(&nats.StreamConfig{Name: sname, Subjects: []string{"foo"}, Replicas: 3}) + require_NoError(t, err) + + // Create some R1 consumers + for i := 0; i < 10; i++ { + inbox := nats.NewInbox() + natsSubSync(t, nc, inbox) + _, err = js.AddConsumer(sname, &nats.ConsumerConfig{DeliverSubject: inbox}) + require_NoError(t, err) + } + + // Now check that the consumer count in stream info/list is 10 + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + // Check stream info + si, err := js.StreamInfo(sname) + if err != nil { + return fmt.Errorf("Error getting stream info: %v", err) + } + if n := si.State.Consumers; n != 10 { + return fmt.Errorf("From StreamInfo, expecting 10 consumers, got %v", n) + } + + // Now from stream list + for si := range js.StreamsInfo() { + if n := si.State.Consumers; n != 10 { + return fmt.Errorf("From StreamsInfo, expecting 10 consumers, got %v", n) + } + } + return nil + }) +} + // Support functions // Used to setup superclusters for tests.