Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add implementation for consumer replica change #3293

Merged
merged 2 commits into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3593,7 +3593,13 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj
}

if isClustered && !req.Config.Direct {
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config)
// If we are inline with client, we still may need to do a callout for consumer info
// during this call, so place in Go routine to not block client.
if c.kind != ROUTER && c.kind != GATEWAY {
go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config)
} else {
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config)
}
return
}

Expand Down
161 changes: 117 additions & 44 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1878,6 +1878,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
case <-uch:
// keep stream assignment current
sa = mset.streamAssignment()
// keep peer list up to date with config
js.checkPeers(mset.raftGroup())
// We get this when we have a new stream assignment caused by an update.
// We want to know if we are migrating.
migrating := mset.isMigrating()
Expand Down Expand Up @@ -3739,7 +3741,8 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
// We get this when we have a new consumer assignment caused by an update.
// We want to know if we are migrating.
rg := o.raftGroup()

// keep peer list up to date with config
js.checkPeers(rg)
// If we are migrating, monitor for the new peers to be caught up.
if isLeader && len(rg.Peers) != o.replica() {
startMigrationMonitoring()
Expand Down Expand Up @@ -4764,6 +4767,52 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
cc.meta.Propose(encodeAddStreamAssignment(sa))
}

// blocking utility call to perform requests on the system account
// returns (synchronized) v or error
func (s *Server) sysRequest(v interface{}, subjFormat string, args ...interface{}) (interface{}, error) {
isubj := fmt.Sprintf(subjFormat, args...)
s.mu.Lock()
inbox := s.newRespInbox()
results := make(chan interface{}, 1)
// Store our handler.
s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
if err := json.Unmarshal(msg, v); err != nil {
s.Warnf("Error unmarshalling response for request '%s':%v", isubj, err)
return
}
select {
case results <- v:
default:
s.Warnf("Failed placing request response on internal channel")
}
}
s.mu.Unlock()

s.sendInternalMsgLocked(isubj, inbox, nil, nil)

const timeout = 2 * time.Second
notActive := time.NewTimer(timeout)
defer notActive.Stop()

var err error
var data interface{}

select {
case <-s.quitCh:
matthiashanel marked this conversation as resolved.
Show resolved Hide resolved
err = fmt.Errorf("server shutdown while waiting for response")
case <-notActive.C:
err = fmt.Errorf("timeout")
case data = <-results:
}
// Clean up here.
s.mu.Lock()
if s.sys != nil && s.sys.replies != nil {
delete(s.sys.replies, inbox)
}
s.mu.Unlock()
return data, err
}

func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, cfg *StreamConfig, peerSet []string) {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
Expand Down Expand Up @@ -4891,48 +4940,11 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
if !s.allPeersOffline(rg) {
// Need to release js lock.
js.mu.Unlock()
s.mu.Lock()
inbox := s.newRespInbox()
results := make(chan *StreamInfo, 1)
// Store our handler.
s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
var si StreamInfo
if err := json.Unmarshal(msg, &si); err != nil {
s.Warnf("Error unmarshaling clustered stream info response:%v", err)
return
}
select {
case results <- &si:
default:
s.Warnf("Failed placing remote stream info result on internal channel")
}
}
s.mu.Unlock()

isubj := fmt.Sprintf(clusterStreamInfoT, ci.serviceAccount(), cfg.Name)
s.sendInternalMsgLocked(isubj, inbox, nil, nil)

const timeout = 2 * time.Second
notActive := time.NewTimer(timeout)
defer notActive.Stop()

select {
case <-s.quitCh:
break
case <-notActive.C:
s.Warnf("Did not receive stream info results for '%s > %s'", acc, cfg.Name)
case si := <-results:
if si.Cluster != nil {
// The leader here is the server name, but need to convert to internal name.
curLeader = string(getHash(si.Cluster.Leader))
}
if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
s.Warnf("Did not receive stream info results for '%s > %s' due to: %s", acc, cfg.Name, err)
} else if cl := si.(*StreamInfo).Cluster; cl != nil {
curLeader = string(getHash(cl.Leader))
}
// Clean up here.
s.mu.Lock()
if s.sys != nil && s.sys.replies != nil {
delete(s.sys.replies, inbox)
}
s.mu.Unlock()
// Re-acquire here.
js.mu.Lock()
}
Expand Down Expand Up @@ -5807,13 +5819,74 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
Created: time.Now().UTC(),
}
} else {
nca := ca.copyGroup()

rBefore := ca.Config.replicas(sa.Config)
rAfter := cfg.replicas(sa.Config)

var curLeader string
if rBefore != rAfter {
// We are modifying nodes here. We want to do our best to preserve the current leader.
// We have support now from above that guarantees we are in our own Go routine, so can
// ask for stream info from the stream leader to make sure we keep the leader in the new list.
if !s.allPeersOffline(ca.Group) {
// Need to release js lock.
js.mu.Unlock()
if ci, err := s.sysRequest(&ConsumerInfo{}, clusterConsumerInfoT, ci.serviceAccount(), sa.Config.Name, cfg.Durable); err != nil {
s.Warnf("Did not receive consumer info results for '%s > %s > %s' due to: %s", acc, sa.Config.Name, cfg.Durable, err)
} else if cl := ci.(*ConsumerInfo).Cluster; cl != nil {
curLeader = string(getHash(cl.Leader))
}
// Re-acquire here.
js.mu.Lock()
}
}

if rBefore < rAfter {
newPeerSet := nca.Group.Peers
// scale up by adding new members from the stream peer set that are not yet in the consumer peer set
streamPeerSet := copyStrings(sa.Group.Peers)
rand.Shuffle(rAfter, func(i, j int) { streamPeerSet[i], streamPeerSet[j] = streamPeerSet[j], streamPeerSet[i] })
for _, p := range streamPeerSet {
found := false
for _, sp := range newPeerSet {
if sp == p {
found = true
break
}
}
if !found {
newPeerSet = append(newPeerSet, p)
if len(newPeerSet) == rAfter {
break
}
}
}
nca.Group.Peers = newPeerSet
nca.Group.Preferred = curLeader
} else if rBefore > rAfter {
newPeerSet := nca.Group.Peers
// mark leader preferred and move it to end
nca.Group.Preferred = curLeader
if nca.Group.Preferred != _EMPTY_ {
for i, p := range newPeerSet {
if nca.Group.Preferred == p {
newPeerSet[i] = newPeerSet[len(newPeerSet)-1]
newPeerSet[len(newPeerSet)-1] = p
}
}
}
// scale down by removing peers from the end
newPeerSet = newPeerSet[len(newPeerSet)-rAfter:]
nca.Group.Peers = newPeerSet
}

// Update config and client info on copy of existing.
nca := *ca
nca.Config = cfg
nca.Client = ci
nca.Subject = subject
nca.Reply = reply
ca = &nca
ca = nca
}

eca := encodeAddConsumerAssignment(ca)
Expand Down
92 changes: 92 additions & 0 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3670,6 +3670,98 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) {
require_NoError(t, err)
}

func TestJetStreamClusterScaleConsumer(t *testing.T) {
c := createJetStreamClusterWithTemplate(t, jsClusterTempl, "C", 3)
defer c.shutdown()

srv := c.randomNonLeader()
nc, js := jsClientConnect(t, srv)
defer nc.Close()

si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

durCfg := &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy}
ci, err := js.AddConsumer("TEST", durCfg)
require_NoError(t, err)
require_True(t, ci.Config.Replicas == 0)

toSend := uint64(1_000)
for i := uint64(0); i < toSend; i++ {
_, err = js.Publish("foo", nil)
require_NoError(t, err)
}

s, err := js.PullSubscribe("foo", "DUR")
require_NoError(t, err)

consumeOne := func(expSeq uint64) error {
if ci, err := js.ConsumerInfo("TEST", "DUR"); err != nil {
return err
} else if ci.Delivered.Stream != expSeq {
return fmt.Errorf("pre: not expected delivered stream %d, got %d", expSeq, ci.Delivered.Stream)
} else if ci.Delivered.Consumer != expSeq {
return fmt.Errorf("pre: not expected delivered consumer %d, got %d", expSeq, ci.Delivered.Consumer)
} else if ci.AckFloor.Stream != expSeq {
return fmt.Errorf("pre: not expected ack stream %d, got %d", expSeq, ci.AckFloor.Stream)
} else if ci.AckFloor.Consumer != expSeq {
return fmt.Errorf("pre: not expected ack consumer %d, got %d", expSeq, ci.AckFloor.Consumer)
}
if m, err := s.Fetch(1); err != nil {
return err
} else if err := m[0].AckSync(); err != nil {
return err
}
expSeq = expSeq + 1
if ci, err := js.ConsumerInfo("TEST", "DUR"); err != nil {
return err
} else if ci.Delivered.Stream != expSeq {
return fmt.Errorf("post: not expected delivered stream %d, got %d", expSeq, ci.Delivered.Stream)
} else if ci.Delivered.Consumer != expSeq {
return fmt.Errorf("post: not expected delivered consumer %d, got %d", expSeq, ci.Delivered.Consumer)
} else if ci.AckFloor.Stream != expSeq {
return fmt.Errorf("post: not expected ack stream %d, got %d", expSeq, ci.AckFloor.Stream)
} else if ci.AckFloor.Consumer != expSeq {
return fmt.Errorf("post: not expected ack consumer %d, got %d", expSeq, ci.AckFloor.Consumer)
}
return nil
}

require_NoError(t, consumeOne(0))

// scale down, up, down and up to default == 3 again
for i, r := range []int{1, 3, 1, 0} {
durCfg.Replicas = r
if r == 0 {
r = si.Config.Replicas
}
js.UpdateConsumer("TEST", durCfg)

checkFor(t, time.Second*30, time.Millisecond*250, func() error {
if ci, err = js.ConsumerInfo("TEST", "DUR"); err != nil {
return err
} else if ci.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("no leader")
} else if len(ci.Cluster.Replicas) != r-1 {
return fmt.Errorf("not enough replica, got %d wanted %d", len(ci.Cluster.Replicas), r-1)
} else {
for _, r := range ci.Cluster.Replicas {
if !r.Current || r.Offline || r.Lag != 0 {
return fmt.Errorf("replica %s not current %t offline %t lag %d", r.Name, r.Current, r.Offline, r.Lag)
}
}
}
return nil
})

require_NoError(t, consumeOne(uint64(i+1)))
}
}

func TestJetStreamClusterMoveCancel(t *testing.T) {
server := map[string]struct{}{}
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
Expand Down