Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close leafnode connection when same cluster name detected #3232

Merged
merged 1 commit into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ const (
DuplicateClientID
DuplicateServerName
MinimumVersionRequired
ClusterNamesIdentical
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
)

// Some flags passed to processMsgResults
Expand Down
1 change: 1 addition & 0 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2526,6 +2526,7 @@ func TestClientClampMaxSubsErrReport(t *testing.T) {
s1.SetLogger(l, false, false)

o2 := DefaultOptions()
o2.Cluster.Name = "xyz"
u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", o1.LeafNode.Port))
o2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
s2 := RunServer(o2)
Expand Down
4 changes: 4 additions & 0 deletions server/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ var (
// attempted to connect to the leaf node listen port.
ErrClientConnectedToLeafNodePort = errors.New("attempted to connect to leaf node port")

// ErrLeafNodeHasSameClusterName represents an error condition when a leafnode is a cluster
// and it has the same cluster name as the hub cluster.
ErrLeafNodeHasSameClusterName = errors.New("remote leafnode has same cluster name")

// ErrConnectedToWrongPort represents an error condition when a connection is attempted
// to the wrong listen port (for instance a LeafNode to a client port, etc...)
ErrConnectedToWrongPort = errors.New("attempted to connect to wrong port")
Expand Down
19 changes: 18 additions & 1 deletion server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11194,7 +11194,7 @@ func TestJetStreamClusterConsumerAndStreamNamesWithPathSeparators(t *testing.T)
}

func TestJetStreamClusterFilteredMirrors(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSR", 5)
c := createJetStreamClusterExplicit(t, "MSR", 3)
defer c.shutdown()

// Client for API requests.
Expand Down Expand Up @@ -11250,5 +11250,22 @@ func TestJetStreamClusterFilteredMirrors(t *testing.T) {
require_True(t, meta.Sequence.Stream == sseq)
sseq += 3
}
}

// Test for making sure we error on same cluster name.
func TestJetStreamClusterSameClusterLeafNodes(t *testing.T) {
c := createJetStreamCluster(t, jsClusterAccountsTempl, "SAME", _EMPTY_, 3, 11233, true)
defer c.shutdown()

// Do by hand since by default we check for connections.
tmpl := c.createLeafSolicit(jsClusterTemplWithLeafNode)
lc := createJetStreamCluster(t, tmpl, "SAME", "S-", 2, 22111, false)
defer lc.shutdown()

time.Sleep(200 * time.Millisecond)

// Make sure no leafnodes are connected.
for _, s := range lc.servers {
checkLeafNodeConnectedCount(t, s, 0)
}
}
19 changes: 19 additions & 0 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,16 +410,22 @@ func createJetStreamSuperClusterWithTemplateAndModHook(t *testing.T, tmpl string
}

func (sc *supercluster) createLeafNodes(clusterName string, numServers int) *cluster {
sc.t.Helper()

// Create our leafnode cluster template first.
return sc.createLeafNodesWithDomain(clusterName, numServers, "")
}

func (sc *supercluster) createLeafNodesWithDomain(clusterName string, numServers int, domain string) *cluster {
sc.t.Helper()

// Create our leafnode cluster template first.
return sc.randomCluster().createLeafNodes(clusterName, numServers, domain)
}

func (sc *supercluster) createSingleLeafNode(extend bool) *Server {
sc.t.Helper()

return sc.randomCluster().createLeafNode(extend)
}

Expand Down Expand Up @@ -828,14 +834,17 @@ var jsLeafFrag = `
`

func (c *cluster) createLeafNodes(clusterName string, numServers int, domain string) *cluster {
c.t.Helper()
return c.createLeafNodesWithStartPortAndDomain(clusterName, numServers, 22111, domain)
}

func (c *cluster) createLeafNodesNoJS(clusterName string, numServers int) *cluster {
c.t.Helper()
return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNodeNoJS, clusterName, numServers, 21333)
}

func (c *cluster) createLeafNodesWithStartPortAndDomain(clusterName string, numServers int, portStart int, domain string) *cluster {
c.t.Helper()
if domain == _EMPTY_ {
return c.createLeafNodesWithTemplateAndStartPort(jsClusterTemplWithLeafNode, clusterName, numServers, portStart)
}
Expand All @@ -844,6 +853,7 @@ func (c *cluster) createLeafNodesWithStartPortAndDomain(clusterName string, numS
}

func (c *cluster) createLeafNode(extend bool) *Server {
c.t.Helper()
if extend {
return c.createLeafNodeWithTemplate("LNS",
strings.ReplaceAll(jsClusterTemplWithSingleLeafNode, "store_dir:", " extension_hint: will_extend, store_dir:"))
Expand All @@ -853,6 +863,7 @@ func (c *cluster) createLeafNode(extend bool) *Server {
}

func (c *cluster) createLeafNodeWithTemplate(name, template string) *Server {
c.t.Helper()
tmpl := c.createLeafSolicit(template)
conf := fmt.Sprintf(tmpl, name, createDir(c.t, JetStreamStoreDir))
s, o := RunServerWithConfig(createConfFile(c.t, []byte(conf)))
Expand All @@ -863,6 +874,8 @@ func (c *cluster) createLeafNodeWithTemplate(name, template string) *Server {

// Helper to generate the leaf solicit configs.
func (c *cluster) createLeafSolicit(tmpl string) string {
c.t.Helper()

// Create our leafnode cluster template first.
var lns, lnss []string
for _, s := range c.servers {
Expand All @@ -880,6 +893,8 @@ func (c *cluster) createLeafSolicit(tmpl string) string {
}

func (c *cluster) createLeafNodesWithTemplateMixedMode(template, clusterName string, numJsServers, numNonServers int, doJSConfig bool) *cluster {
c.t.Helper()

// Create our leafnode cluster template first.
tmpl := c.createLeafSolicit(template)
pre := clusterName + "-"
Expand All @@ -894,6 +909,8 @@ func (c *cluster) createLeafNodesWithTemplateMixedMode(template, clusterName str
}

func (c *cluster) createLeafNodesWithTemplateAndStartPort(template, clusterName string, numServers int, portStart int) *cluster {
c.t.Helper()

// Create our leafnode cluster template first.
tmpl := c.createLeafSolicit(template)
pre := clusterName + "-"
Expand All @@ -906,6 +923,8 @@ func (c *cluster) createLeafNodesWithTemplateAndStartPort(template, clusterName

// Will add in the mapping for the account to each server.
func (c *cluster) addSubjectMapping(account, src, dest string) {
c.t.Helper()

for _, s := range c.servers {
if s.ClusterName() != c.name {
continue
Expand Down
19 changes: 18 additions & 1 deletion server/leafnode.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2021 The NATS Authors
// Copyright 2019-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -49,6 +49,9 @@ const leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
// connection is closed and it won't attempt to reconnect for that long.
const leafNodeReconnectAfterPermViolation = 30 * time.Second

// When we have the same cluster name as the hub.
const leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second

// Prefix for loop detection subject
const leafNodeLoopDetectionSubjectPrefix = "$LDS."

Expand Down Expand Up @@ -1382,6 +1385,13 @@ func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) erro
return err
}

// Check for cluster name collisions.
if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
c.closeConnection(ClusterNamesIdentical)
return ErrLeafNodeHasSameClusterName
}

// Reject if this has Gateway which means that it would be from a gateway
// connection that incorrectly connects to the leafnode port.
if proto.Gateway != _EMPTY_ {
Expand Down Expand Up @@ -2269,6 +2279,13 @@ func (c *client) leafPermViolation(pub bool, subj []byte) {

// Invoked from generic processErr() for LEAF connections.
func (c *client) leafProcessErr(errStr string) {
// Check if we got a cluster name collision.
if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
_, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
return
}

// We will look for Loop detected error coming from the other side.
// If we solicit, set the connect delay.
if !strings.Contains(errStr, "Loop detected") {
Expand Down
21 changes: 18 additions & 3 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ func TestLeafNodeAccountNotFound(t *testing.T) {
u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ob.LeafNode.Port))

oa := DefaultOptions()
oa.Cluster.Name = "xyz"
oa.LeafNode.ReconnectInterval = 10 * time.Millisecond
oa.LeafNode.Remotes = []*RemoteLeafOpts{
{
Expand Down Expand Up @@ -498,6 +499,7 @@ func TestLeafNodeRTT(t *testing.T) {

lnBURL, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ob.LeafNode.Port))
oa := DefaultOptions()
oa.Cluster.Name = "xyz"
oa.PingInterval = 15 * time.Millisecond
oa.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{lnBURL}}}
sa := RunServer(oa)
Expand Down Expand Up @@ -562,6 +564,7 @@ func TestLeafNodeRTT(t *testing.T) {
// Now check that initial RTT is computed prior to first PingInterval
// Get new options to avoid possible race changing the ping interval.
ob = DefaultOptions()
ob.Cluster.Name = "xyz"
ob.PingInterval = time.Minute
ob.LeafNode.Host = "127.0.0.1"
ob.LeafNode.Port = -1
Expand Down Expand Up @@ -857,6 +860,7 @@ func TestLeafNodeLoop(t *testing.T) {
sa.SetLogger(l, false, false)

ob := DefaultOptions()
ob.Cluster.Name = "xyz"
ob.LeafNode.ReconnectInterval = 10 * time.Millisecond
ob.LeafNode.Port = 5678
ua, _ := url.Parse("nats://127.0.0.1:1234")
Expand Down Expand Up @@ -1262,6 +1266,7 @@ func TestLeafNodePermissions(t *testing.T) {

u, _ := url.Parse(fmt.Sprintf("nats://%s:%d", lo1.LeafNode.Host, lo1.LeafNode.Port))
lo2 := DefaultOptions()
lo2.Cluster.Name = "xyz"
lo2.LeafNode.ReconnectInterval = 5 * time.Millisecond
lo2.LeafNode.connDelay = 100 * time.Millisecond
lo2.LeafNode.Remotes = []*RemoteLeafOpts{
Expand Down Expand Up @@ -1405,6 +1410,7 @@ func TestLeafNodePermissionsConcurrentAccess(t *testing.T) {

u, _ := url.Parse(fmt.Sprintf("nats://%s:%d", lo1.LeafNode.Host, lo1.LeafNode.Port))
lo2 := DefaultOptions()
lo2.Cluster.Name = "xyz"
lo2.LeafNode.ReconnectInterval = 5 * time.Millisecond
lo2.LeafNode.connDelay = 500 * time.Millisecond
lo2.LeafNode.Remotes = []*RemoteLeafOpts{
Expand Down Expand Up @@ -1674,6 +1680,7 @@ func TestLeafNodeTmpClients(t *testing.T) {
// Check with normal leafnode connection that once connected,
// the tmp map is also emptied.
bo := DefaultOptions()
bo.Cluster.Name = "xyz"
bo.LeafNode.ReconnectInterval = 5 * time.Millisecond
u, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ao.LeafNode.Port))
if err != nil {
Expand Down Expand Up @@ -1729,6 +1736,8 @@ func TestLeafNodeTLSVerifyAndMap(t *testing.T) {
defer s.Shutdown()

slo := DefaultOptions()
slo.Cluster.Name = "xyz"

sltlsc := &tls.Config{}
if test.provideCert {
tc := &TLSConfigOpts{
Expand Down Expand Up @@ -1969,7 +1978,7 @@ func TestLeafNodeOriginClusterInfo(t *testing.T) {
remotes [ { url: "nats://127.0.0.1:%d" } ]
}
cluster {
name: "abc"
name: "xyz"
listen: "127.0.0.1:-1"
}
`, hopts.LeafNode.Port)))
Expand All @@ -1987,8 +1996,8 @@ func TestLeafNodeOriginClusterInfo(t *testing.T) {
checkLeafNodeConnected(t, s)

l = grabLeaf()
if rc := l.remoteCluster(); rc != "abc" {
t.Fatalf("Expected a remote cluster name of \"abc\", got %q", rc)
if rc := l.remoteCluster(); rc != "xyz" {
t.Fatalf("Expected a remote cluster name of \"xyz\", got %q", rc)
}
pcid := l.cid

Expand Down Expand Up @@ -2265,6 +2274,7 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) {

oLeaf1 := DefaultOptions()
oLeaf1.ServerName = "leaf1"
oLeaf1.Cluster.Name = "xyz"
oLeaf1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
leaf1 := RunServer(oLeaf1)
defer leaf1.Shutdown()
Expand All @@ -2273,6 +2283,7 @@ func TestLeafNodeNoDuplicateWithinCluster(t *testing.T) {

oLeaf2 := DefaultOptions()
oLeaf2.ServerName = "leaf2"
oLeaf2.Cluster.Name = "xyz"
oLeaf2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
oLeaf2.Routes = RoutesFromStr(leaf1ClusterURL)
leaf2 := RunServer(oLeaf2)
Expand Down Expand Up @@ -2378,11 +2389,13 @@ func TestLeafNodeLMsgSplit(t *testing.T) {
remoteLeafs := []*RemoteLeafOpts{{URLs: []*url.URL{u1, u2}}}

oLeaf1 := DefaultOptions()
oLeaf1.Cluster.Name = "xyz"
oLeaf1.LeafNode.Remotes = remoteLeafs
leaf1 := RunServer(oLeaf1)
defer leaf1.Shutdown()

oLeaf2 := DefaultOptions()
oLeaf2.Cluster.Name = "xyz"
oLeaf2.LeafNode.Remotes = remoteLeafs
oLeaf2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", oLeaf1.Cluster.Port))
leaf2 := RunServer(oLeaf2)
Expand Down Expand Up @@ -2475,6 +2488,7 @@ func TestLeafNodeRouteParseLSUnsub(t *testing.T) {
remoteLeafs := []*RemoteLeafOpts{{URLs: []*url.URL{u2}}}

oLeaf2 := DefaultOptions()
oLeaf2.Cluster.Name = "xyz"
oLeaf2.LeafNode.Remotes = remoteLeafs
leaf2 := RunServer(oLeaf2)
defer leaf2.Shutdown()
Expand Down Expand Up @@ -3296,6 +3310,7 @@ func TestLeafNodeStreamImport(t *testing.T) {

o2 := DefaultOptions()
o2.LeafNode.Port = -1
o2.Cluster.Name = "xyz"

accB := NewAccount("B")
if err := accB.AddStreamExport(">", nil); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2240,6 +2240,8 @@ func (reason ClosedState) String() string {
return "Duplicate Server Name"
case MinimumVersionRequired:
return "Minimum Version Required"
case ClusterNamesIdentical:
return "Cluster Names Identical"
}

return "Unknown State"
Expand Down
2 changes: 2 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1828,13 +1828,15 @@ func TestReconnectErrorReports(t *testing.T) {

// Now try with leaf nodes
csOpts.Cluster.Port = 0
csOpts.Cluster.Name = _EMPTY_
csOpts.LeafNode.Host = "127.0.0.1"
csOpts.LeafNode.Port = -1

cs = RunServer(csOpts)
defer cs.Shutdown()

opts.Cluster.Port = 0
opts.Cluster.Name = _EMPTY_
opts.Routes = nil
u, _ := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", csOpts.LeafNode.Port))
opts.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{u}}}
Expand Down