Skip to content

Commit

Permalink
[FIXED] LeafNode: propagation interest issue after a config reload
Browse files Browse the repository at this point in the history
When a configuration reload is done, the account's leaf node connections
were not transfered to the new instance of the account, causing the
interest to not be propagated until a leafnode reconnect or a server
restart.

Resolves #3009

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Apr 20, 2022
1 parent 69ea1ab commit 730d892
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 99 deletions.
111 changes: 111 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5419,3 +5419,114 @@ func TestLeafNodeMinVersion(t *testing.T) {
// OK
}
}

func TestLeafNodeStreamAndShadowSubs(t *testing.T) {
hubConf := createConfFile(t, []byte(`
port: -1
leafnodes {
port: -1
authorization: {
user: leaf
password: leaf
account: B
}
}
accounts: {
A: {
users = [{user: usrA, password: usrA}]
exports: [{stream: foo.*.>}]
}
B: {
imports: [{stream: {account: A, subject: foo.*.>}}]
}
}
`))
defer removeFile(t, hubConf)
hub, hubo := RunServerWithConfig(hubConf)
defer hub.Shutdown()

leafConfContet := fmt.Sprintf(`
port: -1
leafnodes {
remotes = [
{
url: "nats-leaf://leaf:leaf@127.0.0.1:%d"
account: B
}
]
}
accounts: {
B: {
exports: [{stream: foo.*.>}]
}
C: {
users: [{user: usrC, password: usrC}]
imports: [{stream: {account: B, subject: foo.bar.>}}]
}
}
`, hubo.LeafNode.Port)
leafConf := createConfFile(t, []byte(leafConfContet))
defer removeFile(t, leafConf)
leafo := LoadConfig(leafConf)
leafo.LeafNode.ReconnectInterval = 50 * time.Millisecond
leaf := RunServer(leafo)
defer leaf.Shutdown()

checkLeafNodeConnected(t, hub)
checkLeafNodeConnected(t, leaf)

subPubAndCheck := func() {
t.Helper()

ncl, err := nats.Connect(leaf.ClientURL(), nats.UserInfo("usrC", "usrC"))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer ncl.Close()

fmt.Printf("@@IK: ------ sub to foo.*.baz -----\n")
// This will send an LS+ to the "hub" server.
sub, err := ncl.SubscribeSync("foo.*.baz")
if err != nil {
t.Fatalf("Error subscribing: %v", err)
}
ncl.Flush()

ncm, err := nats.Connect(hub.ClientURL(), nats.UserInfo("usrA", "usrA"))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer ncm.Close()

// Try a few times in case subject interest has not propagated yet
for i := 0; i < 5; i++ {
ncm.Publish("foo.bar.baz", []byte("msg"))
if _, err := sub.NextMsg(time.Second); err == nil {
// OK, done!
return
}
}
t.Fatal("Message was not received")
}
subPubAndCheck()

// Now cause a restart of the accepting side so that the leaf connection
// is recreated.
hub.Shutdown()
hub = RunServer(hubo)
defer hub.Shutdown()

checkLeafNodeConnected(t, hub)
checkLeafNodeConnected(t, leaf)

subPubAndCheck()

// Issue a config reload even though we make no modification. There was
// a defect that caused the interest propagation to break.
// Set the ReconnectInterval to the default value so that reload does not complain.
leaf.getOpts().LeafNode.ReconnectInterval = DEFAULT_LEAF_NODE_RECONNECT
reloadUpdateConfig(t, leaf, leafConf, leafConfContet)

// Check again
subPubAndCheck()
}
2 changes: 2 additions & 0 deletions server/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,8 @@ func (s *Server) reloadAuthorization() {
newAcc.clients[c] = struct{}{}
}
}
// Same for leafnodes
newAcc.lleafs = append([]*client(nil), acc.lleafs...)

newAcc.sl = acc.sl
newAcc.rm = acc.rm
Expand Down
99 changes: 0 additions & 99 deletions test/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4422,105 +4422,6 @@ func TestLeafNodeAndGatewaysStreamAndShadowSubs(t *testing.T) {
}
}

func TestLeafNodeStreamAndShadowSubs(t *testing.T) {
hubConf := createConfFile(t, []byte(`
port: -1
leafnodes {
port: -1
authorization: {
user: leaf
password: leaf
account: B
}
}
accounts: {
A: {
users = [{user: usrA, password: usrA}]
exports: [{stream: foo.*.>}]
}
B: {
imports: [{stream: {account: A, subject: foo.*.>}}]
}
}
`))
defer removeFile(t, hubConf)
hub, hubo := RunServerWithConfig(hubConf)
defer hub.Shutdown()

leafConf := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
leafnodes {
remotes = [
{
url: "nats-leaf://leaf:leaf@127.0.0.1:%d"
account: B
}
]
}
accounts: {
B: {
exports: [{stream: foo.*.>}]
}
C: {
users: [{user: usrC, password: usrC}]
imports: [{stream: {account: B, subject: foo.bar.>}}]
}
}
`, hubo.LeafNode.Port)))
defer removeFile(t, leafConf)
leafo := LoadConfig(leafConf)
leafo.LeafNode.ReconnectInterval = 50 * time.Millisecond
leaf := RunServer(leafo)
defer leaf.Shutdown()

checkLeafNodeConnected(t, hub)
checkLeafNodeConnected(t, leaf)

ncl, err := nats.Connect(leaf.ClientURL(), nats.UserInfo("usrC", "usrC"))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer ncl.Close()

// This will send an LS+ to the "hub" server.
sub, err := ncl.SubscribeSync("foo.*.baz")
if err != nil {
t.Fatalf("Error subscribing: %v", err)
}
ncl.Flush()

pubAndCheck := func() {
t.Helper()
ncm, err := nats.Connect(hub.ClientURL(), nats.UserInfo("usrA", "usrA"))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer ncm.Close()

// Try a few times in case subject interest has not propagated yet
for i := 0; i < 5; i++ {
ncm.Publish("foo.bar.baz", []byte("msg"))
if _, err := sub.NextMsg(time.Second); err == nil {
// OK, done!
return
}
}
t.Fatal("Message was not received")
}
pubAndCheck()

// Now cause a restart of the accepting side so that the leaf connection
// is recreated.
hub.Shutdown()
hub = RunServer(hubo)
defer hub.Shutdown()

checkLeafNodeConnected(t, hub)
checkLeafNodeConnected(t, leaf)

pubAndCheck()
}

func TestLeafnodeHeaders(t *testing.T) {
srv, opts := runLeafServer()
defer srv.Shutdown()
Expand Down

0 comments on commit 730d892

Please sign in to comment.