Skip to content

Commit

Permalink
Merge pull request #2871 from nats-io/tag_placement
Browse files Browse the repository at this point in the history
Allow stream placement by tags.
  • Loading branch information
derekcollison committed Feb 16, 2022
2 parents ae9bb19 + ca1132a commit ea20cb4
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 4 deletions.
8 changes: 7 additions & 1 deletion server/events.go
Expand Up @@ -162,6 +162,7 @@ type ServerInfo struct {
Cluster string `json:"cluster,omitempty"`
Domain string `json:"domain,omitempty"`
Version string `json:"ver"`
Tags []string `json:"tags,omitempty"`
Seq uint64 `json:"seq"`
JetStream bool `json:"jetstream"`
Time time.Time `json:"time"`
Expand Down Expand Up @@ -315,6 +316,9 @@ RESET:
}
s.mu.Unlock()

// Grab tags.
tags := s.getOpts().Tags

for s.eventsRunning() {
select {
case <-sendq.ch:
Expand All @@ -331,6 +335,7 @@ RESET:
pm.si.Version = VERSION
pm.si.Time = time.Now().UTC()
pm.si.JetStream = js
pm.si.Tags = tags
}
var b []byte
if pm.msg != nil {
Expand Down Expand Up @@ -1139,6 +1144,7 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
si.Cluster,
si.Domain,
si.ID,
si.Tags,
cfg,
stats,
false, si.JetStream,
Expand Down Expand Up @@ -1177,7 +1183,7 @@ func (s *Server) processNewServer(si *ServerInfo) {
node := string(getHash(si.Name))
// Only update if non-existent
if _, ok := s.nodeToInfo.Load(node); !ok {
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Version, si.Cluster, si.Domain, si.ID, nil, nil, false, si.JetStream})
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Version, si.Cluster, si.Domain, si.ID, si.Tags, nil, nil, false, si.JetStream})
}
}
// Announce ourselves..
Expand Down
21 changes: 20 additions & 1 deletion server/jetstream_cluster.go
Expand Up @@ -56,7 +56,7 @@ type jetStreamCluster struct {

// Used to guide placement of streams and meta controllers in clustered JetStream.
type Placement struct {
Cluster string `json:"cluster"`
Cluster string `json:"cluster,omitempty"`
Tags []string `json:"tags,omitempty"`
}

Expand Down Expand Up @@ -3680,6 +3680,12 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
maxBytes = uint64(cfg.MaxBytes)
}

// Check for tags.
var tags []string
if cfg.Placement != nil && len(cfg.Placement.Tags) > 0 {
tags = cfg.Placement.Tags
}

// Used for weighted sorting based on availability.
type wn struct {
id string
Expand Down Expand Up @@ -3720,6 +3726,19 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
}

if len(tags) > 0 {
matched := true
for _, t := range tags {
if !ni.tags.Contains(t) {
matched = false
break
}
}
if !matched {
continue
}
}

var available uint64
switch cfg.Storage {
case MemoryStorage:
Expand Down
82 changes: 82 additions & 0 deletions server/jetstream_cluster_test.go
Expand Up @@ -10585,6 +10585,88 @@ func TestJetStreamClusterStreamReplicaUpdates(t *testing.T) {
updateReplicas(1)
}

func TestJetStreamClusterStreamTagPlacement(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 4)
defer sc.shutdown()

reset := func(s *Server) {
s.mu.Lock()
s.sys.resetCh <- struct{}{}
s.mu.Unlock()
s.sendStatszUpdate()
}

// Make first cluster AWS, US country code.
for _, s := range sc.clusterForName("C1").servers {
opts := s.getOpts()
opts.Tags.Add("cloud:aws")
opts.Tags.Add("country:us")
reset(s)
}
// Make second cluster GCP, UK country code.
for _, s := range sc.clusterForName("C2").servers {
opts := s.getOpts()
opts.Tags.Add("cloud:gcp")
opts.Tags.Add("country:uk")
reset(s)
}
// Make third cluster AZ, JP country code.
for _, s := range sc.clusterForName("C3").servers {
opts := s.getOpts()
opts.Tags.Add("cloud:az")
opts.Tags.Add("country:jp")
reset(s)
}
// Make fourth cluster GCP, and SG country code.
for _, s := range sc.clusterForName("C4").servers {
opts := s.getOpts()
opts.Tags.Add("cloud:gcp")
opts.Tags.Add("country:sg")
reset(s)
}

placeOK := func(connectCluster string, tags []string, expectedCluster string) {
t.Helper()
nc, js := jsClientConnect(t, sc.clusterForName(connectCluster).randomServer())
defer nc.Close()
si, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Placement: &nats.Placement{Tags: tags},
})
require_NoError(t, err)
if si.Cluster.Name != expectedCluster {
t.Fatalf("Failed to place properly in %q, got %q", expectedCluster, si.Cluster.Name)
}
js.DeleteStream("TEST")
}

placeOK("C2", []string{"cloud:aws"}, "C1")
placeOK("C2", []string{"country:jp"}, "C3")
placeOK("C1", []string{"cloud:gcp", "country:uk"}, "C2")
placeOK("C2", []string{"cloud:gcp", "country:sg"}, "C4")

// Case shoud not matter.
placeOK("C1", []string{"cloud:GCP", "country:UK"}, "C2")
placeOK("C2", []string{"Cloud:Gcp", "Country:Sg"}, "C4")

placeErr := func(connectCluster string, tags []string) {
t.Helper()
nc, js := jsClientConnect(t, sc.clusterForName(connectCluster).randomServer())
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Placement: &nats.Placement{Tags: tags},
})
require_Error(t, err, NewJSInsufficientResourcesError())
}

placeErr("C1", []string{"cloud:GCP", "country:US"})
placeErr("C1", []string{"country:DN"})
placeErr("C1", []string{"cloud:DO"})
}

// Support functions

// Used to setup superclusters for tests.
Expand Down
2 changes: 1 addition & 1 deletion server/opts.go
Expand Up @@ -286,7 +286,7 @@ type Options struct {
ReconnectErrorReports int

// Tags describing the server. They will be included in varz
// and used as a filter criteria for some system requests
// and used as a filter criteria for some system requests.
Tags jwt.TagList `json:"-"`

// OCSPConfig enables OCSP Stapling in the server.
Expand Down
2 changes: 1 addition & 1 deletion server/route.go
Expand Up @@ -1420,7 +1420,7 @@ func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
// check to be consistent and future proof. but will be same domain
if s.sameDomain(info.Domain) {
s.nodeToInfo.Store(c.route.hash,
nodeInfo{c.route.remoteName, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, false, info.JetStream})
nodeInfo{c.route.remoteName, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream})
}
c.mu.Lock()
c.route.connectURLs = info.ClientConnectURLs
Expand Down
2 changes: 2 additions & 0 deletions server/server.go
Expand Up @@ -287,6 +287,7 @@ type nodeInfo struct {
cluster string
domain string
id string
tags jwt.TagList
cfg *JetStreamConfig
stats *JetStreamStats
offline bool
Expand Down Expand Up @@ -412,6 +413,7 @@ func NewServer(opts *Options) (*Server, error) {
opts.Cluster.Name,
opts.JetStreamDomain,
info.ID,
opts.Tags,
&JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore},
nil,
false, true,
Expand Down

0 comments on commit ea20cb4

Please sign in to comment.