Skip to content

Commit

Permalink
add implementation for consumer replica change (#3293)
Browse files Browse the repository at this point in the history
* add implementation for consumer replica change

fixes #3262

also check peer list on every update

Signed-off-by: Matthias Hanel <mh@synadia.com>
  • Loading branch information
matthiashanel committed Jul 27, 2022
1 parent 5f12c24 commit 3358205
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 45 deletions.
8 changes: 7 additions & 1 deletion server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3593,7 +3593,13 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj
}

if isClustered && !req.Config.Direct {
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config)
// If we are inline with client, we still may need to do a callout for consumer info
// during this call, so place in Go routine to not block client.
if c.kind != ROUTER && c.kind != GATEWAY {
go s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config)
} else {
s.jsClusteredConsumerRequest(ci, acc, subject, reply, rmsg, req.Stream, &req.Config)
}
return
}

Expand Down
166 changes: 122 additions & 44 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1878,6 +1878,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
case <-uch:
// keep stream assignment current
sa = mset.streamAssignment()
// keep peer list up to date with config
js.checkPeers(mset.raftGroup())
// We get this when we have a new stream assignment caused by an update.
// We want to know if we are migrating.
migrating := mset.isMigrating()
Expand Down Expand Up @@ -3739,7 +3741,8 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
// We get this when we have a new consumer assignment caused by an update.
// We want to know if we are migrating.
rg := o.raftGroup()

// keep peer list up to date with config
js.checkPeers(rg)
// If we are migrating, monitor for the new peers to be caught up.
if isLeader && len(rg.Peers) != o.replica() {
startMigrationMonitoring()
Expand Down Expand Up @@ -4764,6 +4767,57 @@ func (s *Server) jsClusteredStreamRequest(ci *ClientInfo, acc *Account, subject,
cc.meta.Propose(encodeAddStreamAssignment(sa))
}

var (
errReqTimeout = errors.New("timeout while waiting for response")
errReqSrvExit = errors.New("server shutdown while waiting for response")
)

// blocking utility call to perform requests on the system account
// returns (synchronized) v or error
func (s *Server) sysRequest(v interface{}, subjFormat string, args ...interface{}) (interface{}, error) {
isubj := fmt.Sprintf(subjFormat, args...)
s.mu.Lock()
inbox := s.newRespInbox()
results := make(chan interface{}, 1)
// Store our handler.
s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
if err := json.Unmarshal(msg, v); err != nil {
s.Warnf("Error unmarshalling response for request '%s':%v", isubj, err)
return
}
select {
case results <- v:
default:
s.Warnf("Failed placing request response on internal channel")
}
}
s.mu.Unlock()

s.sendInternalMsgLocked(isubj, inbox, nil, nil)

const timeout = 2 * time.Second
notActive := time.NewTimer(timeout)
defer notActive.Stop()

var err error
var data interface{}

select {
case <-s.quitCh:
err = errReqSrvExit
case <-notActive.C:
err = errReqTimeout
case data = <-results:
}
// Clean up here.
s.mu.Lock()
if s.sys != nil && s.sys.replies != nil {
delete(s.sys.replies, inbox)
}
s.mu.Unlock()
return data, err
}

func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, cfg *StreamConfig, peerSet []string) {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
Expand Down Expand Up @@ -4891,48 +4945,11 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
if !s.allPeersOffline(rg) {
// Need to release js lock.
js.mu.Unlock()
s.mu.Lock()
inbox := s.newRespInbox()
results := make(chan *StreamInfo, 1)
// Store our handler.
s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
var si StreamInfo
if err := json.Unmarshal(msg, &si); err != nil {
s.Warnf("Error unmarshaling clustered stream info response:%v", err)
return
}
select {
case results <- &si:
default:
s.Warnf("Failed placing remote stream info result on internal channel")
}
}
s.mu.Unlock()

isubj := fmt.Sprintf(clusterStreamInfoT, ci.serviceAccount(), cfg.Name)
s.sendInternalMsgLocked(isubj, inbox, nil, nil)

const timeout = 2 * time.Second
notActive := time.NewTimer(timeout)
defer notActive.Stop()

select {
case <-s.quitCh:
break
case <-notActive.C:
s.Warnf("Did not receive stream info results for '%s > %s'", acc, cfg.Name)
case si := <-results:
if si.Cluster != nil {
// The leader here is the server name, but need to convert to internal name.
curLeader = string(getHash(si.Cluster.Leader))
}
if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
s.Warnf("Did not receive stream info results for '%s > %s' due to: %s", acc, cfg.Name, err)
} else if cl := si.(*StreamInfo).Cluster; cl != nil {
curLeader = string(getHash(cl.Leader))
}
// Clean up here.
s.mu.Lock()
if s.sys != nil && s.sys.replies != nil {
delete(s.sys.replies, inbox)
}
s.mu.Unlock()
// Re-acquire here.
js.mu.Lock()
}
Expand Down Expand Up @@ -5807,13 +5824,74 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
Created: time.Now().UTC(),
}
} else {
nca := ca.copyGroup()

rBefore := ca.Config.replicas(sa.Config)
rAfter := cfg.replicas(sa.Config)

var curLeader string
if rBefore != rAfter {
// We are modifying nodes here. We want to do our best to preserve the current leader.
// We have support now from above that guarantees we are in our own Go routine, so can
// ask for stream info from the stream leader to make sure we keep the leader in the new list.
if !s.allPeersOffline(ca.Group) {
// Need to release js lock.
js.mu.Unlock()
if ci, err := s.sysRequest(&ConsumerInfo{}, clusterConsumerInfoT, ci.serviceAccount(), sa.Config.Name, cfg.Durable); err != nil {
s.Warnf("Did not receive consumer info results for '%s > %s > %s' due to: %s", acc, sa.Config.Name, cfg.Durable, err)
} else if cl := ci.(*ConsumerInfo).Cluster; cl != nil {
curLeader = string(getHash(cl.Leader))
}
// Re-acquire here.
js.mu.Lock()
}
}

if rBefore < rAfter {
newPeerSet := nca.Group.Peers
// scale up by adding new members from the stream peer set that are not yet in the consumer peer set
streamPeerSet := copyStrings(sa.Group.Peers)
rand.Shuffle(rAfter, func(i, j int) { streamPeerSet[i], streamPeerSet[j] = streamPeerSet[j], streamPeerSet[i] })
for _, p := range streamPeerSet {
found := false
for _, sp := range newPeerSet {
if sp == p {
found = true
break
}
}
if !found {
newPeerSet = append(newPeerSet, p)
if len(newPeerSet) == rAfter {
break
}
}
}
nca.Group.Peers = newPeerSet
nca.Group.Preferred = curLeader
} else if rBefore > rAfter {
newPeerSet := nca.Group.Peers
// mark leader preferred and move it to end
nca.Group.Preferred = curLeader
if nca.Group.Preferred != _EMPTY_ {
for i, p := range newPeerSet {
if nca.Group.Preferred == p {
newPeerSet[i] = newPeerSet[len(newPeerSet)-1]
newPeerSet[len(newPeerSet)-1] = p
}
}
}
// scale down by removing peers from the end
newPeerSet = newPeerSet[len(newPeerSet)-rAfter:]
nca.Group.Peers = newPeerSet
}

// Update config and client info on copy of existing.
nca := *ca
nca.Config = cfg
nca.Client = ci
nca.Subject = subject
nca.Reply = reply
ca = &nca
ca = nca
}

eca := encodeAddConsumerAssignment(ca)
Expand Down
92 changes: 92 additions & 0 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3670,6 +3670,98 @@ func TestJetStreamClusterPeerExclusionTag(t *testing.T) {
require_NoError(t, err)
}

func TestJetStreamClusterScaleConsumer(t *testing.T) {
c := createJetStreamClusterWithTemplate(t, jsClusterTempl, "C", 3)
defer c.shutdown()

srv := c.randomNonLeader()
nc, js := jsClientConnect(t, srv)
defer nc.Close()

si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

durCfg := &nats.ConsumerConfig{Durable: "DUR", AckPolicy: nats.AckExplicitPolicy}
ci, err := js.AddConsumer("TEST", durCfg)
require_NoError(t, err)
require_True(t, ci.Config.Replicas == 0)

toSend := uint64(1_000)
for i := uint64(0); i < toSend; i++ {
_, err = js.Publish("foo", nil)
require_NoError(t, err)
}

s, err := js.PullSubscribe("foo", "DUR")
require_NoError(t, err)

consumeOne := func(expSeq uint64) error {
if ci, err := js.ConsumerInfo("TEST", "DUR"); err != nil {
return err
} else if ci.Delivered.Stream != expSeq {
return fmt.Errorf("pre: not expected delivered stream %d, got %d", expSeq, ci.Delivered.Stream)
} else if ci.Delivered.Consumer != expSeq {
return fmt.Errorf("pre: not expected delivered consumer %d, got %d", expSeq, ci.Delivered.Consumer)
} else if ci.AckFloor.Stream != expSeq {
return fmt.Errorf("pre: not expected ack stream %d, got %d", expSeq, ci.AckFloor.Stream)
} else if ci.AckFloor.Consumer != expSeq {
return fmt.Errorf("pre: not expected ack consumer %d, got %d", expSeq, ci.AckFloor.Consumer)
}
if m, err := s.Fetch(1); err != nil {
return err
} else if err := m[0].AckSync(); err != nil {
return err
}
expSeq = expSeq + 1
if ci, err := js.ConsumerInfo("TEST", "DUR"); err != nil {
return err
} else if ci.Delivered.Stream != expSeq {
return fmt.Errorf("post: not expected delivered stream %d, got %d", expSeq, ci.Delivered.Stream)
} else if ci.Delivered.Consumer != expSeq {
return fmt.Errorf("post: not expected delivered consumer %d, got %d", expSeq, ci.Delivered.Consumer)
} else if ci.AckFloor.Stream != expSeq {
return fmt.Errorf("post: not expected ack stream %d, got %d", expSeq, ci.AckFloor.Stream)
} else if ci.AckFloor.Consumer != expSeq {
return fmt.Errorf("post: not expected ack consumer %d, got %d", expSeq, ci.AckFloor.Consumer)
}
return nil
}

require_NoError(t, consumeOne(0))

// scale down, up, down and up to default == 3 again
for i, r := range []int{1, 3, 1, 0} {
durCfg.Replicas = r
if r == 0 {
r = si.Config.Replicas
}
js.UpdateConsumer("TEST", durCfg)

checkFor(t, time.Second*30, time.Millisecond*250, func() error {
if ci, err = js.ConsumerInfo("TEST", "DUR"); err != nil {
return err
} else if ci.Cluster.Leader == _EMPTY_ {
return fmt.Errorf("no leader")
} else if len(ci.Cluster.Replicas) != r-1 {
return fmt.Errorf("not enough replica, got %d wanted %d", len(ci.Cluster.Replicas), r-1)
} else {
for _, r := range ci.Cluster.Replicas {
if !r.Current || r.Offline || r.Lag != 0 {
return fmt.Errorf("replica %s not current %t offline %t lag %d", r.Name, r.Current, r.Offline, r.Lag)
}
}
}
return nil
})

require_NoError(t, consumeOne(uint64(i+1)))
}
}

func TestJetStreamClusterMoveCancel(t *testing.T) {
server := map[string]struct{}{}
sc := createJetStreamSuperClusterWithTemplateAndModHook(t, jsClusterTempl, 4, 2,
Expand Down

0 comments on commit 3358205

Please sign in to comment.