Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] JetStream stream info consumers count in clustered mode #2896

Merged
merged 1 commit into from Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions server/jetstream_api.go
Expand Up @@ -1645,6 +1645,8 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
resp.ApiResponse.Type = JSApiStreamCreateResponseType
}

var clusterWideConsCount int

// If we are in clustered mode we need to be the stream leader to proceed.
if s.JetStreamIsClustered() {
// Check to make sure the stream is assigned.
Expand All @@ -1655,6 +1657,9 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s

js.mu.RLock()
isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName)
if sa != nil {
clusterWideConsCount = len(sa.consumers)
}
js.mu.RUnlock()

if isLeader && sa == nil {
Expand Down Expand Up @@ -1730,6 +1735,9 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
Domain: s.getOpts().JetStreamDomain,
Cluster: js.clusterInfo(mset.raftGroup()),
}
if clusterWideConsCount > 0 {
resp.StreamInfo.State.Consumers = clusterWideConsCount
}
if mset.isMirror() {
resp.StreamInfo.Mirror = mset.mirrorInfo()
} else if mset.hasSources() {
Expand Down
8 changes: 6 additions & 2 deletions server/jetstream_cluster.go
Expand Up @@ -4250,7 +4250,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt
}()

var missingNames []string
sent := map[string]struct{}{}
sent := map[string]int{}
// Send out our requests here.
for _, sa := range streams {
if s.allPeersOffline(sa.Group) {
Expand All @@ -4261,7 +4261,7 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt
} else {
isubj := fmt.Sprintf(clusterStreamInfoT, sa.Client.serviceAccount(), sa.Config.Name)
s.sendInternalMsgLocked(isubj, inbox, nil, nil)
sent[sa.Config.Name] = struct{}{}
sent[sa.Config.Name] = len(sa.consumers)
}
}
// Don't hold lock.
Expand All @@ -4284,6 +4284,10 @@ LOOP:
resp.Missing = missingNames
break LOOP
case si := <-rc:
consCount := sent[si.Config.Name]
if consCount > 0 {
si.State.Consumers = consCount
}
delete(sent, si.Config.Name)
resp.Streams = append(resp.Streams, si)
// Check to see if we are done.
Expand Down
40 changes: 40 additions & 0 deletions server/jetstream_cluster_test.go
Expand Up @@ -10736,6 +10736,46 @@ func TestJetStreamClusterInterestRetentionWithFilteredConsumersExtra(t *testing.
checkState(0)
}

func TestJetStreamClusterStreamConsumersCount(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

sname := "TEST_STREAM_CONS_COUNT"
_, err := js.AddStream(&nats.StreamConfig{Name: sname, Subjects: []string{"foo"}, Replicas: 3})
require_NoError(t, err)

// Create some R1 consumers
for i := 0; i < 10; i++ {
inbox := nats.NewInbox()
natsSubSync(t, nc, inbox)
_, err = js.AddConsumer(sname, &nats.ConsumerConfig{DeliverSubject: inbox})
require_NoError(t, err)
}

// Now check that the consumer count in stream info/list is 10
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
// Check stream info
si, err := js.StreamInfo(sname)
if err != nil {
return fmt.Errorf("Error getting stream info: %v", err)
}
if n := si.State.Consumers; n != 10 {
return fmt.Errorf("From StreamInfo, expecting 10 consumers, got %v", n)
}

// Now from stream list
for si := range js.StreamsInfo() {
if n := si.State.Consumers; n != 10 {
return fmt.Errorf("From StreamsInfo, expecting 10 consumers, got %v", n)
}
}
return nil
})
}

// Support functions

// Used to setup superclusters for tests.
Expand Down