Skip to content

Commit

Permalink
[FIXED] LeafNode interest propagation with imports/exports
Browse files Browse the repository at this point in the history
When using subscriptions through import/exports, the server with
a leafnode connection would properly send the interest over, but
if the connection is recreated, this would not happen.

In case of JetStream where that happens under the cover, message
flow would stop after the leafnode restart because the subscriptions
would be created on recovery of the JetStream assets but *before*
the LeafNode connection could be established.

Resolves #3024
Resolves #3027
Resolves #3009

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Apr 13, 2022
1 parent 37a3403 commit 1019b90
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 2 deletions.
2 changes: 1 addition & 1 deletion server/sublist.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ func matchLiteral(literal, subject string) bool {
}

func addLocalSub(sub *subscription, subs *[]*subscription, includeLeafHubs bool) {
if sub != nil && sub.client != nil && sub.im == nil {
if sub != nil && sub.client != nil {
kind := sub.client.kind
if kind == CLIENT || kind == SYSTEM || kind == JETSTREAM || kind == ACCOUNT ||
(includeLeafHubs && sub.client.isHubLeafNode() /* implied kind==LEAF */) {
Expand Down
103 changes: 102 additions & 1 deletion test/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4303,7 +4303,9 @@ func TestLeafNodeAdvertiseInCluster(t *testing.T) {
expectNothing(t, lc)
}

func TestLeafNodeStreamAndShadowSubs(t *testing.T) {
func TestLeafNodeAndGatewaysStreamAndShadowSubs(t *testing.T) {
server.SetGatewaysSolicitDelay(10 * time.Millisecond)
defer server.ResetGatewaysSolicitDelay()
conf1 := createConfFile(t, []byte(`
port: -1
system_account: SYS
Expand Down Expand Up @@ -4420,6 +4422,105 @@ func TestLeafNodeStreamAndShadowSubs(t *testing.T) {
}
}

func TestLeafNodeStreamAndShadowSubs(t *testing.T) {
hubConf := createConfFile(t, []byte(`
port: -1
leafnodes {
port: -1
authorization: {
user: leaf
password: leaf
account: B
}
}
accounts: {
A: {
users = [{user: usrA, password: usrA}]
exports: [{stream: foo.*.>}]
}
B: {
imports: [{stream: {account: A, subject: foo.*.>}}]
}
}
`))
defer removeFile(t, hubConf)
hub, hubo := RunServerWithConfig(hubConf)
defer hub.Shutdown()

leafConf := createConfFile(t, []byte(fmt.Sprintf(`
port: -1
leafnodes {
remotes = [
{
url: "nats-leaf://leaf:leaf@127.0.0.1:%d"
account: B
}
]
}
accounts: {
B: {
exports: [{stream: foo.*.>}]
}
C: {
users: [{user: usrC, password: usrC}]
imports: [{stream: {account: B, subject: foo.bar.>}}]
}
}
`, hubo.LeafNode.Port)))
defer removeFile(t, leafConf)
leafo := LoadConfig(leafConf)
leafo.LeafNode.ReconnectInterval = 50 * time.Millisecond
leaf := RunServer(leafo)
defer leaf.Shutdown()

checkLeafNodeConnected(t, hub)
checkLeafNodeConnected(t, leaf)

ncl, err := nats.Connect(leaf.ClientURL(), nats.UserInfo("usrC", "usrC"))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer ncl.Close()

// This will send an LS+ to the "hub" server.
sub, err := ncl.SubscribeSync("foo.*.baz")
if err != nil {
t.Fatalf("Error subscribing: %v", err)
}
ncl.Flush()

pubAndCheck := func() {
t.Helper()
ncm, err := nats.Connect(hub.ClientURL(), nats.UserInfo("usrA", "usrA"))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
defer ncm.Close()

// Try a few times in case subject interest has not propagated yet
for i := 0; i < 5; i++ {
ncm.Publish("foo.bar.baz", []byte("msg"))
if _, err := sub.NextMsg(time.Second); err == nil {
// OK, done!
return
}
}
t.Fatal("Message was not received")
}
pubAndCheck()

// Now cause a restart of the accepting side so that the leaf connection
// is recreated.
hub.Shutdown()
hub = RunServer(hubo)
defer hub.Shutdown()

checkLeafNodeConnected(t, hub)
checkLeafNodeConnected(t, leaf)

pubAndCheck()
}

func TestLeafnodeHeaders(t *testing.T) {
srv, opts := runLeafServer()
defer srv.Shutdown()
Expand Down

0 comments on commit 1019b90

Please sign in to comment.