Skip to content

Commit

Permalink
Close leafnode connection when same cluster name detected
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jun 30, 2022
1 parent 8a94e14 commit e6479da
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 5 deletions.
1 change: 1 addition & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ const (
DuplicateClientID
DuplicateServerName
MinimumVersionRequired
ClusterNamesIdentical
)

// Some flags passed to processMsgResults
Expand Down
1 change: 1 addition & 0 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2526,6 +2526,7 @@ func TestClientClampMaxSubsErrReport(t *testing.T) {
s1.SetLogger(l, false, false)

o2 := DefaultOptions()
o2.Cluster.Name = "xyz"
u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port))
o2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
s2 := RunServer(o2)
Expand Down
4 changes: 4 additions & 0 deletions server/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ var (
// attempted to connect to the leaf node listen port.
ErrClientConnectedToLeafNodePort = errors.New("attempted to connect to leaf node port")

// ErrLeafNodeHasSameClusterName represents an error condition when a leafnode is a cluster
// and it has the same cluster name as the hub cluster.
ErrLeafNodeHasSameClusterName = errors.New("remote leafnode has same cluster name")

// ErrConnectedToWrongPort represents an error condition when a connection is attempted
// to the wrong listen port (for instance a LeafNode to a client port, etc...)
ErrConnectedToWrongPort = errors.New("attempted to connect to wrong port")
Expand Down
19 changes: 18 additions & 1 deletion server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11194,7 +11194,7 @@ func TestJetStreamClusterConsumerAndStreamNamesWithPathSeparators(t *testing.T)
}

func TestJetStreamClusterFilteredMirrors(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSR", 5)
c := createJetStreamClusterExplicit(t, "MSR", 3)
defer c.shutdown()

// Client for API requests.
Expand Down Expand Up @@ -11250,5 +11250,22 @@ func TestJetStreamClusterFilteredMirrors(t *testing.T) {
require_True(t, meta.Sequence.Stream == sseq)
sseq += 3
}
}

// Test for making sure we error on same cluster name.
func TestJetStreamClusterSameClusterLeafNodes(t *testing.T) {
c := createJetStreamCluster(t, jsClusterAccountsTempl, "SAME", _EMPTY_, 3, 11233, true)
defer c.shutdown()

// Do by hand since by default we check for connections.
tmpl := c.createLeafSolicit(jsClusterTemplWithLeafNode)
lc := createJetStreamCluster(t, tmpl, "SAME", "S-", 2, 22111, false)
defer lc.shutdown()

time.Sleep(200 * time.Millisecond)

// Make sure no leafnodes are connected.
for _, s := range lc.servers {
checkLeafNodeConnectedCount(t, s, 0)
}
}
19 changes: 19 additions & 0 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,16 +410,22 @@ func createJetStreamSuperClusterWithTemplateAndModHook(t *testing.T, tmpl string
}

func (sc *supercluster) createLeafNodes(clusterName string, numServers int) *cluster {
sc.t.Helper()

// Create our leafnode cluster template first.
return sc.createLeafNodesWithDomain(clusterName, numServers, "")
}

func (sc *supercluster) createLeafNodesWithDomain(clusterName string, numServers int, domain string) *cluster {
sc.t.Helper()

// Create our leafnode cluster template first.
return sc.randomCluster().createLeafNodes(clusterName, numServers, domain)
}

func (sc *supercluster) createSingleLeafNode(extend bool) *Server {
sc.t.Helper()

return sc.randomCluster().createLeafNode(extend)
}

Expand Down Expand Up @@ -828,14 +834,17 @@ var jsLeafFrag = `
`

func (c *cluster) createLeafNodes(clusterName string, numServers int, domain string) *cluster {
c.t.Helper()
return c.createLeafNodesWithStartPortAndDomain(clusterName, numServers, 22111, domain)
}

func (c *cluster) createLeafNodesNoJS(clusterName string, numServers int) *cluster {
c.t.Helper()
return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNodeNoJS, clusterName, numServers, 21333)
}

func (c *cluster) createLeafNodesWithStartPortAndDomain(clusterName string, numServers int, portStart int, domain string) *cluster {
c.t.Helper()
if domain == _EMPTY_ {
return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNode, clusterName, numServers, portStart)
}
Expand All @@ -844,6 +853,7 @@ func (c *cluster) createLeafNodesWithStartPortAndDomain(clusterName string, numS
}

func (c *cluster) createLeafNode(extend bool) *Server {
c.t.Helper()
if extend {
return c.createLeafNodeWithTemplate("LNS",
strings.ReplaceAll(jsClusterTemplWithSingleLeafNode, "store_dir:", " extension_hint: will_extend, store_dir:"))
Expand All @@ -853,6 +863,7 @@ func (c *cluster) createLeafNode(extend bool) *Server {
}

func (c *cluster) createLeafNodeWithTemplate(name, template string) *Server {
c.t.Helper()
tmpl := c.createLeafSolicit(template)
conf := fmt.Sprintf(tmpl, name, createDir(c.t, JetStreamStoreDir))
s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf)))
Expand All @@ -863,6 +874,8 @@ func (c *cluster) createLeafNodeWithTemplate(name, template string) *Server {

// Helper to generate the leaf solicit configs.
func (c *cluster) createLeafSolicit(tmpl string) string {
c.t.Helper()

// Create our leafnode cluster template first.
var lns, lnss []string
for _, s := range c.servers {
Expand All @@ -880,6 +893,8 @@ func (c *cluster) createLeafSolicit(tmpl string) string {
}

func (c *cluster) createLeafNodesWithTemplateMixedMode(template, clusterName string, numJsServers, numNonServers int, doJSConfig bool) *cluster {
c.t.Helper()

// Create our leafnode cluster template first.
tmpl := c.createLeafSolicit(template)
pre := clusterName + "-"
Expand All @@ -894,6 +909,8 @@ func (c *cluster) createLeafNodesWithTemplateMixedMode(template, clusterName str
}

func (c *cluster) createLeafNodesWithTemplateAndStartPort(template, clusterName string, numServers int, portStart int) *cluster {
c.t.Helper()

// Create our leafnode cluster template first.
tmpl := c.createLeafSolicit(template)
pre := clusterName + "-"
Expand All @@ -906,6 +923,8 @@ func (c *cluster) createLeafNodesWithTemplateAndStartPort(template, clusterName

// Will add in the mapping for the account to each server.
func (c *cluster) addSubjectMapping(account, src, dest string) {
c.t.Helper()

for _, s := range c.servers {
if s.ClusterName() != c.name {
continue
Expand Down
19 changes: 18 additions & 1 deletion server/leafnode.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2021 The NATS Authors
// Copyright 2019-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -49,6 +49,9 @@ const leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
// connection is closed and it won't attempt to reconnect for that long.
const leafNodeReconnectAfterPermViolation = 30 * time.Second

// When we have the same cluster name as the hub.
const leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second

// Prefix for loop detection subject
const leafNodeLoopDetectionSubjectPrefix = "$LDS."

Expand Down Expand Up @@ -1382,6 +1385,13 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
return err
}

// Check for cluster name collisions.
if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
c.closeConnection(ClusterNamesIdentical)
return ErrLeafNodeHasSameClusterName
}

// Reject if this has Gateway which means that it would be from a gateway
// connection that incorrectly connects to the leafnode port.
if proto.Gateway != _EMPTY_ {
Expand Down Expand Up @@ -2269,6 +2279,13 @@ func (c *client) leafPermViolation(pub bool, subj []byte) {

// Invoked from generic processErr() for LEAF connections.
func (c *client) leafProcessErr(errStr string) {
// Check if we got a cluster name collision.
if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
_, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
return
}

// We will look for Loop detected error coming from the other side.
// If we solicit, set the connect delay.
if !strings.Contains(errStr, "Loop detected") {
Expand Down
21 changes: 18 additions & 3 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ func TestLeafNodeAccountNotFound(t *testing.T) {
u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ob.LeafNode.Port))

oa := DefaultOptions()
oa.Cluster.Name = "xyz"
oa.LeafNode.ReconnectInterval = 10 * time.Millisecond
oa.LeafNode.Remotes = []*RemoteLeafOpts{
{
Expand Down Expand Up @@ -498,6 +499,7 @@ func TestLeafNodeRTT(t *testing.T) {

lnBURL, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ob.LeafNode.Port))
oa := DefaultOptions()
oa.Cluster.Name = "xyz"
oa.PingInterval = 15 * time.Millisecond
oa.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{lnBURL}}}
sa := RunServer(oa)
Expand Down Expand Up @@ -562,6 +564,7 @@ func TestLeafNodeRTT(t *testing.T) {
// Now check that initial RTT is computed prior to first PingInterval
// Get new options to avoid possible race changing the ping interval.
ob = DefaultOptions()
ob.Cluster.Name = "xyz"
ob.PingInterval = time.Minute
ob.LeafNode.Host = "127.0.0.1"
ob.LeafNode.Port = -1
Expand Down Expand Up @@ -857,6 +860,7 @@ func TestLeafNodeLoop(t *testing.T) {
sa.SetLogger(l, false, false)

ob := DefaultOptions()
ob.Cluster.Name = "xyz"
ob.LeafNode.ReconnectInterval = 10 * time.Millisecond
ob.LeafNode.Port = 5678
ua, _ := url.Parse("nats://127.0.0.1:1234")
Expand Down Expand Up @@ -1262,6 +1266,7 @@ func TestLeafNodePermissions(t *testing.T) {

u, _ := url.Parse(fmt.Sprintf("nats://%s:%d", lo1.LeafNode.Host, lo1.LeafNode.Port))
lo2 := DefaultOptions()
lo2.Cluster.Name = "xyz"
lo2.LeafNode.ReconnectInterval = 5 * time.Millisecond
lo2.LeafNode.connDelay = 100 * time.Millisecond
lo2.LeafNode.Remotes = []*RemoteLeafOpts{
Expand Down Expand Up @@ -1405,6 +1410,7 @@ func TestLeafNodePermissionsConcurrentAccess(t *testing.T) {

u, _ := url.Parse(fmt.Sprintf("nats://%s:%d", lo1.LeafNode.Host, lo1.LeafNode.Port))
lo2 := DefaultOptions()
lo2.Cluster.Name = "xyz"
lo2.LeafNode.ReconnectInterval = 5 * time.Millisecond
lo2.LeafNode.connDelay = 500 * time.Millisecond
lo2.LeafNode.Remotes = []*RemoteLeafOpts{
Expand Down Expand Up @@ -1674,6 +1680,7 @@ func TestLeafNodeTmpClients(t *testing.T) {
// Check with normal leafnode connection that once connected,
// the tmp map is also emptied.
bo := DefaultOptions()
bo.Cluster.Name = "xyz"
bo.LeafNode.ReconnectInterval = 5 * time.Millisecond
u, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ao.LeafNode.Port))
if err != nil {
Expand Down Expand Up @@ -1729,6 +1736,8 @@ func TestLeafNodeTLSVerifyAndMap(t *testing.T) {
defer s.Shutdown()

slo := DefaultOptions()
slo.Cluster.Name = "xyz"

sltlsc := &tls.Config{}
if test.provideCert {
tc := &TLSConfigOpts{
Expand Down Expand Up @@ -1969,7 +1978,7 @@ func TestLeafNodeOriginClusterInfo(t *testing.T) {
remotes [ { url: "nats://127.0.0.1:%d" } ]
}
cluster {
name: "abc"
name: "xyz"
listen: "127.0.0.1:-1"
}
`, hopts.LeafNode.Port)))
Expand All @@ -1987,8 +1996,8 @@ func TestLeafNodeOriginClusterInfo(t *testing.T) {
checkLeafNodeConnected(t, s)

l = grabLeaf()
if rc := l.remoteCluster(); rc != "abc" {
t.Fatalf("Expected a remote cluster name of \"abc\", got %q", rc)
if rc := l.remoteCluster(); rc != "xyz" {
t.Fatalf("Expected a remote cluster name of \"xyz\", got %q", rc)
}
pcid := l.cid

Expand Down Expand Up @@ -2265,6 +2274,7 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) {

oLeaf1 := DefaultOptions()
oLeaf1.ServerName = "leaf1"
oLeaf1.Cluster.Name = "xyz"
oLeaf1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
leaf1 := RunServer(oLeaf1)
defer leaf1.Shutdown()
Expand All @@ -2273,6 +2283,7 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) {

oLeaf2 := DefaultOptions()
oLeaf2.ServerName = "leaf2"
oLeaf2.Cluster.Name = "xyz"
oLeaf2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
oLeaf2.Routes = RoutesFromStr(leaf1ClusterURL)
leaf2 := RunServer(oLeaf2)
Expand Down Expand Up @@ -2378,11 +2389,13 @@ func TestLeafNodeLMsgSplit(t *testing.T) {
remoteLeafs := []*RemoteLeafOpts{{URLs: []*url.URL{u1, u2}}}

oLeaf1 := DefaultOptions()
oLeaf1.Cluster.Name = "xyz"
oLeaf1.LeafNode.Remotes = remoteLeafs
leaf1 := RunServer(oLeaf1)
defer leaf1.Shutdown()

oLeaf2 := DefaultOptions()
oLeaf2.Cluster.Name = "xyz"
oLeaf2.LeafNode.Remotes = remoteLeafs
oLeaf2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", oLeaf1.Cluster.Port))
leaf2 := RunServer(oLeaf2)
Expand Down Expand Up @@ -2475,6 +2488,7 @@ func TestLeafNodeRouteParseLSUnsub(t *testing.T) {
remoteLeafs := []*RemoteLeafOpts{{URLs: []*url.URL{u2}}}

oLeaf2 := DefaultOptions()
oLeaf2.Cluster.Name = "xyz"
oLeaf2.LeafNode.Remotes = remoteLeafs
leaf2 := RunServer(oLeaf2)
defer leaf2.Shutdown()
Expand Down Expand Up @@ -3296,6 +3310,7 @@ func TestLeafNodeStreamImport(t *testing.T) {

o2 := DefaultOptions()
o2.LeafNode.Port = -1
o2.Cluster.Name = "xyz"

accB := NewAccount("B")
if err := accB.AddStreamExport(">", nil); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2240,6 +2240,8 @@ func (reason ClosedState) String() string {
return "Duplicate Server Name"
case MinimumVersionRequired:
return "Minimum Version Required"
case ClusterNamesIdentical:
return "Cluster Names Identical"
}

return "Unknown State"
Expand Down
2 changes: 2 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1828,13 +1828,15 @@ func TestReconnectErrorReports(t *testing.T) {

// Now try with leaf nodes
csOpts.Cluster.Port = 0
csOpts.Cluster.Name = _EMPTY_
csOpts.LeafNode.Host = "127.0.0.1"
csOpts.LeafNode.Port = -1

cs = RunServer(csOpts)
defer cs.Shutdown()

opts.Cluster.Port = 0
opts.Cluster.Name = _EMPTY_
opts.Routes = nil
u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", csOpts.LeafNode.Port))
opts.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
Expand Down

0 comments on commit e6479da

Please sign in to comment.