From 1e53d81cb347a8d679f139b218507feb23e49e3b Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 4 Mar 2022 17:03:06 -0700 Subject: [PATCH] [FIXED] LeafNode: queue sub interest not properly sent to new LN In complex situations, queue members count across various servers may not be properly accounted for when sent to a new leafnode connection. The new test TestLeafNodeQueueGroupWithLateLNJoin has a drawing of such setup, when after LN1 joined, and then queue members were removed with 1 left, LN1 was told that there was no more interest, so message published to LN1 would not reach the remaining queue sub connected to LN2. Signed-off-by: Ivan Kozlovic --- server/leafnode.go | 6 +- server/leafnode_test.go | 138 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 1 deletion(-) diff --git a/server/leafnode.go b/server/leafnode.go index 7d0acdeac0..e93eb60050 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -1546,7 +1546,11 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { // Also don't add the subscription if it has a origin cluster and the // cluster name matches the one of the client we are sending to. if c != sub.client && (sub.origin == nil || (string(sub.origin) != rc)) { - c.leaf.smap[keyFromSub(sub)]++ + count := int32(1) + if len(sub.queue) > 0 && sub.qw > 0 { + count = sub.qw + } + c.leaf.smap[keyFromSub(sub)] += count if c.leaf.tsub == nil { c.leaf.tsub = make(map[*subscription]struct{}) } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 4beef433c2..6ab7c38fb3 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -5045,3 +5045,141 @@ default_js_domain: {B:"DHUB"} require_Equal(t, si.Cluster.Name, "HUB") } + +func TestLeafNodeQueueGroupWithLateLNJoin(t *testing.T) { + /* + + Topology: A cluster of leafnodes LN2 and LN3, connect + to a cluster C1, C2. + + sub(foo) sub(foo) + \ / + C1 <-> C2 + ^ ^ + | | + LN2 <-> LN3 + / \ + sub(foo) sub(foo) + + Once the above is set, start LN1 that connects to C1. + + sub(foo) sub(foo) + \ / + LN1 -> C1 <-> C2 + ^ ^ + | | + LN2 <-> LN3 + / \ + sub(foo) sub(foo) + + Remove subs to LN3, C2 and C1. + + LN1 -> C1 <-> C2 + ^ ^ + | | + LN2 <-> LN3 + / + sub(foo) + + Publish from LN1 and verify message is received by sub on LN2. + + pub(foo) + \ + LN1 -> C1 <-> C2 + ^ ^ + | | + LN2 <-> LN3 + / + sub(foo) + */ + co1 := DefaultOptions() + co1.LeafNode.Host = "127.0.0.1" + co1.LeafNode.Port = -1 + co1.Cluster.Name = "ngs" + co1.Cluster.Host = "127.0.0.1" + co1.Cluster.Port = -1 + c1 := RunServer(co1) + defer c1.Shutdown() + + co2 := DefaultOptions() + co2.LeafNode.Host = "127.0.0.1" + co2.LeafNode.Port = -1 + co2.Cluster.Name = "ngs" + co2.Cluster.Host = "127.0.0.1" + co2.Cluster.Port = -1 + co2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", co1.Cluster.Port)) + c2 := RunServer(co2) + defer c2.Shutdown() + + checkClusterFormed(t, c1, c2) + + lo2 := DefaultOptions() + lo2.Cluster.Name = "local" + lo2.Cluster.Host = "127.0.0.1" + lo2.Cluster.Port = -1 + lo2.LeafNode.ReconnectInterval = 50 * time.Millisecond + lo2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{{Scheme: "nats", Host: fmt.Sprintf("127.0.0.1:%d", co1.LeafNode.Port)}}}} + ln2 := RunServer(lo2) + defer ln2.Shutdown() + + lo3 := DefaultOptions() + lo3.Cluster.Name = "local" + lo3.Cluster.Host = "127.0.0.1" + lo3.Cluster.Port = -1 + lo3.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo2.Cluster.Port)) + lo3.LeafNode.ReconnectInterval = 50 * time.Millisecond + lo3.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{{Scheme: "nats", Host: fmt.Sprintf("127.0.0.1:%d", co2.LeafNode.Port)}}}} + ln3 := RunServer(lo3) + defer ln3.Shutdown() + + checkClusterFormed(t, ln2, ln3) + checkLeafNodeConnected(t, ln2) + checkLeafNodeConnected(t, ln3) + + cln2 := natsConnect(t, ln2.ClientURL()) + defer cln2.Close() + sln2 := natsQueueSubSync(t, cln2, "foo", "qgroup") + natsFlush(t, cln2) + + cln3 := natsConnect(t, ln3.ClientURL()) + defer cln3.Close() + sln3 := natsQueueSubSync(t, cln3, "foo", "qgroup") + natsFlush(t, cln3) + + cc1 := natsConnect(t, c1.ClientURL()) + defer cc1.Close() + sc1 := natsQueueSubSync(t, cc1, "foo", "qgroup") + natsFlush(t, cc1) + + cc2 := natsConnect(t, c2.ClientURL()) + defer cc2.Close() + sc2 := natsQueueSubSync(t, cc2, "foo", "qgroup") + natsFlush(t, cc2) + + checkSubInterest(t, c1, globalAccountName, "foo", time.Second) + checkSubInterest(t, c2, globalAccountName, "foo", time.Second) + checkSubInterest(t, ln2, globalAccountName, "foo", time.Second) + checkSubInterest(t, ln3, globalAccountName, "foo", time.Second) + + lo1 := DefaultOptions() + lo1.LeafNode.ReconnectInterval = 50 * time.Millisecond + lo1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{{Scheme: "nats", Host: fmt.Sprintf("127.0.0.1:%d", co1.LeafNode.Port)}}}} + ln1 := RunServer(lo1) + defer ln1.Shutdown() + + checkLeafNodeConnected(t, ln1) + checkSubInterest(t, ln1, globalAccountName, "foo", time.Second) + + sln3.Unsubscribe() + natsFlush(t, cln3) + sc2.Unsubscribe() + natsFlush(t, cc2) + sc1.Unsubscribe() + natsFlush(t, cc1) + + cln1 := natsConnect(t, ln1.ClientURL()) + defer cln1.Close() + + natsPub(t, cln1, "foo", []byte("hello")) + natsNexMsg(t, sln2, time.Second) +}