Skip to content

Commit

Permalink
Close leafnode connection when same cluster name detected
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jun 30, 2022
1 parent 8a94e14 commit 5c1bd76
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 2 deletions.
1 change: 1 addition & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ const (
DuplicateClientID
DuplicateServerName
MinimumVersionRequired
ClusterNamesIdentical
)

// Some flags passed to processMsgResults
Expand Down
4 changes: 4 additions & 0 deletions server/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ var (
// attempted to connect to the leaf node listen port.
ErrClientConnectedToLeafNodePort = errors.New("attempted to connect to leaf node port")

// ErrLeafNodeHasSameClusterName represents an error condition when a leafnode is a cluster
// and it has the same cluster name as the hub cluster.
ErrLeafNodeHasSameClusterName = errors.New("remote leafnode has same cluster name")

// ErrConnectedToWrongPort represents an error condition when a connection is attempted
// to the wrong listen port (for instance a LeafNode to a client port, etc...)
ErrConnectedToWrongPort = errors.New("attempted to connect to wrong port")
Expand Down
19 changes: 18 additions & 1 deletion server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11194,7 +11194,7 @@ func TestJetStreamClusterConsumerAndStreamNamesWithPathSeparators(t *testing.T)
}

func TestJetStreamClusterFilteredMirrors(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSR", 5)
c := createJetStreamClusterExplicit(t, "MSR", 3)
defer c.shutdown()

// Client for API requests.
Expand Down Expand Up @@ -11250,5 +11250,22 @@ func TestJetStreamClusterFilteredMirrors(t *testing.T) {
require_True(t, meta.Sequence.Stream == sseq)
sseq += 3
}
}

// Test for making sure we error on same cluster name.
func TestJetStreamClusterSameClusterLeafNodes(t *testing.T) {
c := createJetStreamCluster(t, jsClusterAccountsTempl, "SAME", _EMPTY_, 3, 11233, true)
defer c.shutdown()

// Do by hand since by default we check for connections.
tmpl := c.createLeafSolicit(jsClusterTemplWithLeafNode)
lc := createJetStreamCluster(t, tmpl, "SAME", "S-", 2, 22111, false)
defer lc.shutdown()

time.Sleep(200 * time.Millisecond)

// Make sure no leafnodes are connected.
for _, s := range lc.servers {
checkLeafNodeConnectedCount(t, s, 0)
}
}
19 changes: 19 additions & 0 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,16 +410,22 @@ func createJetStreamSuperClusterWithTemplateAndModHook(t *testing.T, tmpl string
}

func (sc *supercluster) createLeafNodes(clusterName string, numServers int) *cluster {
sc.t.Helper()

// Create our leafnode cluster template first.
return sc.createLeafNodesWithDomain(clusterName, numServers, "")
}

func (sc *supercluster) createLeafNodesWithDomain(clusterName string, numServers int, domain string) *cluster {
sc.t.Helper()

// Create our leafnode cluster template first.
return sc.randomCluster().createLeafNodes(clusterName, numServers, domain)
}

func (sc *supercluster) createSingleLeafNode(extend bool) *Server {
sc.t.Helper()

return sc.randomCluster().createLeafNode(extend)
}

Expand Down Expand Up @@ -828,14 +834,17 @@ var jsLeafFrag = `
`

func (c *cluster) createLeafNodes(clusterName string, numServers int, domain string) *cluster {
c.t.Helper()
return c.createLeafNodesWithStartPortAndDomain(clusterName, numServers, 22111, domain)
}

func (c *cluster) createLeafNodesNoJS(clusterName string, numServers int) *cluster {
c.t.Helper()
return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNodeNoJS, clusterName, numServers, 21333)
}

func (c *cluster) createLeafNodesWithStartPortAndDomain(clusterName string, numServers int, portStart int, domain string) *cluster {
c.t.Helper()
if domain == _EMPTY_ {
return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNode, clusterName, numServers, portStart)
}
Expand All @@ -844,6 +853,7 @@ func (c *cluster) createLeafNodesWithStartPortAndDomain(clusterName string, numS
}

func (c *cluster) createLeafNode(extend bool) *Server {
c.t.Helper()
if extend {
return c.createLeafNodeWithTemplate("LNS",
strings.ReplaceAll(jsClusterTemplWithSingleLeafNode, "store_dir:", " extension_hint: will_extend, store_dir:"))
Expand All @@ -853,6 +863,7 @@ func (c *cluster) createLeafNode(extend bool) *Server {
}

func (c *cluster) createLeafNodeWithTemplate(name, template string) *Server {
c.t.Helper()
tmpl := c.createLeafSolicit(template)
conf := fmt.Sprintf(tmpl, name, createDir(c.t, JetStreamStoreDir))
s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf)))
Expand All @@ -863,6 +874,8 @@ func (c *cluster) createLeafNodeWithTemplate(name, template string) *Server {

// Helper to generate the leaf solicit configs.
func (c *cluster) createLeafSolicit(tmpl string) string {
c.t.Helper()

// Create our leafnode cluster template first.
var lns, lnss []string
for _, s := range c.servers {
Expand All @@ -880,6 +893,8 @@ func (c *cluster) createLeafSolicit(tmpl string) string {
}

func (c *cluster) createLeafNodesWithTemplateMixedMode(template, clusterName string, numJsServers, numNonServers int, doJSConfig bool) *cluster {
c.t.Helper()

// Create our leafnode cluster template first.
tmpl := c.createLeafSolicit(template)
pre := clusterName + "-"
Expand All @@ -894,6 +909,8 @@ func (c *cluster) createLeafNodesWithTemplateMixedMode(template, clusterName str
}

func (c *cluster) createLeafNodesWithTemplateAndStartPort(template, clusterName string, numServers int, portStart int) *cluster {
c.t.Helper()

// Create our leafnode cluster template first.
tmpl := c.createLeafSolicit(template)
pre := clusterName + "-"
Expand All @@ -906,6 +923,8 @@ func (c *cluster) createLeafNodesWithTemplateAndStartPort(template, clusterName

// Will add in the mapping for the account to each server.
func (c *cluster) addSubjectMapping(account, src, dest string) {
c.t.Helper()

for _, s := range c.servers {
if s.ClusterName() != c.name {
continue
Expand Down
19 changes: 18 additions & 1 deletion server/leafnode.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2021 The NATS Authors
// Copyright 2019-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -49,6 +49,9 @@ const leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
// connection is closed and it won't attempt to reconnect for that long.
const leafNodeReconnectAfterPermViolation = 30 * time.Second

// When we have the same cluster name as the hub.
const leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second

// Prefix for loop detection subject
const leafNodeLoopDetectionSubjectPrefix = "$LDS."

Expand Down Expand Up @@ -1382,6 +1385,13 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
return err
}

// Check for cluster name collisions.
if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
c.closeConnection(ClusterNamesIdentical)
return ErrLeafNodeHasSameClusterName
}

// Reject if this has Gateway which means that it would be from a gateway
// connection that incorrectly connects to the leafnode port.
if proto.Gateway != _EMPTY_ {
Expand Down Expand Up @@ -2269,6 +2279,13 @@ func (c *client) leafPermViolation(pub bool, subj []byte) {

// Invoked from generic processErr() for LEAF connections.
func (c *client) leafProcessErr(errStr string) {
// Check if we got a cluster name collision.
if strings.Contains(errStr, "same cluster name") {
_, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
return
}

// We will look for Loop detected error coming from the other side.
// If we solicit, set the connect delay.
if !strings.Contains(errStr, "Loop detected") {
Expand Down

0 comments on commit 5c1bd76

Please sign in to comment.