Skip to content

Commit

Permalink
[FIXED] LeafNode: queue sub interest not properly sent to new LN
Browse files Browse the repository at this point in the history
In complex situations, queue members count across various servers
may not be properly accounted for when sent to a new leafnode
connection.

The new test TestLeafNodeQueueGroupWithLateLNJoin has a drawing
of such setup, when after LN1 joined, and then queue members
were removed with 1 left, LN1 was told that there was no
more interest, so message published to LN1 would not reach
the remaining queue sub connected to LN2.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Mar 5, 2022
1 parent 77bce19 commit 1e53d81
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 1 deletion.
6 changes: 5 additions & 1 deletion server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1546,7 +1546,11 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
// Also don't add the subscription if it has a origin cluster and the
// cluster name matches the one of the client we are sending to.
if c != sub.client && (sub.origin == nil || (string(sub.origin) != rc)) {
c.leaf.smap[keyFromSub(sub)]++
count := int32(1)
if len(sub.queue) > 0 && sub.qw > 0 {
count = sub.qw
}
c.leaf.smap[keyFromSub(sub)] += count
if c.leaf.tsub == nil {
c.leaf.tsub = make(map[*subscription]struct{})
}
Expand Down
138 changes: 138 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5045,3 +5045,141 @@ default_js_domain: {B:"DHUB"}
require_Equal(t, si.Cluster.Name, "HUB")

}

func TestLeafNodeQueueGroupWithLateLNJoin(t *testing.T) {
/*
Topology: A cluster of leafnodes LN2 and LN3, connect
to a cluster C1, C2.
sub(foo) sub(foo)
\ /
C1 <-> C2
^ ^
| |
LN2 <-> LN3
/ \
sub(foo) sub(foo)
Once the above is set, start LN1 that connects to C1.
sub(foo) sub(foo)
\ /
LN1 -> C1 <-> C2
^ ^
| |
LN2 <-> LN3
/ \
sub(foo) sub(foo)
Remove subs to LN3, C2 and C1.
LN1 -> C1 <-> C2
^ ^
| |
LN2 <-> LN3
/
sub(foo)
Publish from LN1 and verify message is received by sub on LN2.
pub(foo)
\
LN1 -> C1 <-> C2
^ ^
| |
LN2 <-> LN3
/
sub(foo)
*/
co1 := DefaultOptions()
co1.LeafNode.Host = "127.0.0.1"
co1.LeafNode.Port = -1
co1.Cluster.Name = "ngs"
co1.Cluster.Host = "127.0.0.1"
co1.Cluster.Port = -1
c1 := RunServer(co1)
defer c1.Shutdown()

co2 := DefaultOptions()
co2.LeafNode.Host = "127.0.0.1"
co2.LeafNode.Port = -1
co2.Cluster.Name = "ngs"
co2.Cluster.Host = "127.0.0.1"
co2.Cluster.Port = -1
co2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", co1.Cluster.Port))
c2 := RunServer(co2)
defer c2.Shutdown()

checkClusterFormed(t, c1, c2)

lo2 := DefaultOptions()
lo2.Cluster.Name = "local"
lo2.Cluster.Host = "127.0.0.1"
lo2.Cluster.Port = -1
lo2.LeafNode.ReconnectInterval = 50 * time.Millisecond
lo2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{{Scheme: "nats", Host: fmt.Sprintf("127.0.0.1:%d", co1.LeafNode.Port)}}}}
ln2 := RunServer(lo2)
defer ln2.Shutdown()

lo3 := DefaultOptions()
lo3.Cluster.Name = "local"
lo3.Cluster.Host = "127.0.0.1"
lo3.Cluster.Port = -1
lo3.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo2.Cluster.Port))
lo3.LeafNode.ReconnectInterval = 50 * time.Millisecond
lo3.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{{Scheme: "nats", Host: fmt.Sprintf("127.0.0.1:%d", co2.LeafNode.Port)}}}}
ln3 := RunServer(lo3)
defer ln3.Shutdown()

checkClusterFormed(t, ln2, ln3)
checkLeafNodeConnected(t, ln2)
checkLeafNodeConnected(t, ln3)

cln2 := natsConnect(t, ln2.ClientURL())
defer cln2.Close()
sln2 := natsQueueSubSync(t, cln2, "foo", "qgroup")
natsFlush(t, cln2)

cln3 := natsConnect(t, ln3.ClientURL())
defer cln3.Close()
sln3 := natsQueueSubSync(t, cln3, "foo", "qgroup")
natsFlush(t, cln3)

cc1 := natsConnect(t, c1.ClientURL())
defer cc1.Close()
sc1 := natsQueueSubSync(t, cc1, "foo", "qgroup")
natsFlush(t, cc1)

cc2 := natsConnect(t, c2.ClientURL())
defer cc2.Close()
sc2 := natsQueueSubSync(t, cc2, "foo", "qgroup")
natsFlush(t, cc2)

checkSubInterest(t, c1, globalAccountName, "foo", time.Second)
checkSubInterest(t, c2, globalAccountName, "foo", time.Second)
checkSubInterest(t, ln2, globalAccountName, "foo", time.Second)
checkSubInterest(t, ln3, globalAccountName, "foo", time.Second)

lo1 := DefaultOptions()
lo1.LeafNode.ReconnectInterval = 50 * time.Millisecond
lo1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{{Scheme: "nats", Host: fmt.Sprintf("127.0.0.1:%d", co1.LeafNode.Port)}}}}
ln1 := RunServer(lo1)
defer ln1.Shutdown()

checkLeafNodeConnected(t, ln1)
checkSubInterest(t, ln1, globalAccountName, "foo", time.Second)

sln3.Unsubscribe()
natsFlush(t, cln3)
sc2.Unsubscribe()
natsFlush(t, cc2)
sc1.Unsubscribe()
natsFlush(t, cc1)

cln1 := natsConnect(t, ln1.ClientURL())
defer cln1.Close()

natsPub(t, cln1, "foo", []byte("hello"))
natsNexMsg(t, sln2, time.Second)
}

0 comments on commit 1e53d81

Please sign in to comment.