Skip to content

Commit

Permalink
Merge pull request #2901 from nats-io/leaf_qgroup_weight
Browse files Browse the repository at this point in the history
[FIXED] LeafNode: queue sub interest not properly sent to new LN
  • Loading branch information
kozlovic committed Mar 5, 2022
2 parents adfcfdb + 1e53d81 commit 52e3b4b
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 1 deletion.
6 changes: 5 additions & 1 deletion server/leafnode.go
Expand Up @@ -1546,7 +1546,11 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
// Also don't add the subscription if it has a origin cluster and the
// cluster name matches the one of the client we are sending to.
if c != sub.client && (sub.origin == nil || (string(sub.origin) != rc)) {
c.leaf.smap[keyFromSub(sub)]++
count := int32(1)
if len(sub.queue) > 0 && sub.qw > 0 {
count = sub.qw
}
c.leaf.smap[keyFromSub(sub)] += count
if c.leaf.tsub == nil {
c.leaf.tsub = make(map[*subscription]struct{})
}
Expand Down
138 changes: 138 additions & 0 deletions server/leafnode_test.go
Expand Up @@ -5045,3 +5045,141 @@ default_js_domain: {B:"DHUB"}
require_Equal(t, si.Cluster.Name, "HUB")

}

func TestLeafNodeQueueGroupWithLateLNJoin(t *testing.T) {
/*
Topology: A cluster of leafnodes LN2 and LN3, connect
to a cluster C1, C2.
sub(foo) sub(foo)
\ /
C1 <-> C2
^ ^
| |
LN2 <-> LN3
/ \
sub(foo) sub(foo)
Once the above is set, start LN1 that connects to C1.
sub(foo) sub(foo)
\ /
LN1 -> C1 <-> C2
^ ^
| |
LN2 <-> LN3
/ \
sub(foo) sub(foo)
Remove subs to LN3, C2 and C1.
LN1 -> C1 <-> C2
^ ^
| |
LN2 <-> LN3
/
sub(foo)
Publish from LN1 and verify message is received by sub on LN2.
pub(foo)
\
LN1 -> C1 <-> C2
^ ^
| |
LN2 <-> LN3
/
sub(foo)
*/
co1 := DefaultOptions()
co1.LeafNode.Host = "127.0.0.1"
co1.LeafNode.Port = -1
co1.Cluster.Name = "ngs"
co1.Cluster.Host = "127.0.0.1"
co1.Cluster.Port = -1
c1 := RunServer(co1)
defer c1.Shutdown()

co2 := DefaultOptions()
co2.LeafNode.Host = "127.0.0.1"
co2.LeafNode.Port = -1
co2.Cluster.Name = "ngs"
co2.Cluster.Host = "127.0.0.1"
co2.Cluster.Port = -1
co2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", co1.Cluster.Port))
c2 := RunServer(co2)
defer c2.Shutdown()

checkClusterFormed(t, c1, c2)

lo2 := DefaultOptions()
lo2.Cluster.Name = "local"
lo2.Cluster.Host = "127.0.0.1"
lo2.Cluster.Port = -1
lo2.LeafNode.ReconnectInterval = 50 * time.Millisecond
lo2.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{{Scheme: "nats", Host: fmt.Sprintf("127.0.0.1:%d", co1.LeafNode.Port)}}}}
ln2 := RunServer(lo2)
defer ln2.Shutdown()

lo3 := DefaultOptions()
lo3.Cluster.Name = "local"
lo3.Cluster.Host = "127.0.0.1"
lo3.Cluster.Port = -1
lo3.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", lo2.Cluster.Port))
lo3.LeafNode.ReconnectInterval = 50 * time.Millisecond
lo3.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{{Scheme: "nats", Host: fmt.Sprintf("127.0.0.1:%d", co2.LeafNode.Port)}}}}
ln3 := RunServer(lo3)
defer ln3.Shutdown()

checkClusterFormed(t, ln2, ln3)
checkLeafNodeConnected(t, ln2)
checkLeafNodeConnected(t, ln3)

cln2 := natsConnect(t, ln2.ClientURL())
defer cln2.Close()
sln2 := natsQueueSubSync(t, cln2, "foo", "qgroup")
natsFlush(t, cln2)

cln3 := natsConnect(t, ln3.ClientURL())
defer cln3.Close()
sln3 := natsQueueSubSync(t, cln3, "foo", "qgroup")
natsFlush(t, cln3)

cc1 := natsConnect(t, c1.ClientURL())
defer cc1.Close()
sc1 := natsQueueSubSync(t, cc1, "foo", "qgroup")
natsFlush(t, cc1)

cc2 := natsConnect(t, c2.ClientURL())
defer cc2.Close()
sc2 := natsQueueSubSync(t, cc2, "foo", "qgroup")
natsFlush(t, cc2)

checkSubInterest(t, c1, globalAccountName, "foo", time.Second)
checkSubInterest(t, c2, globalAccountName, "foo", time.Second)
checkSubInterest(t, ln2, globalAccountName, "foo", time.Second)
checkSubInterest(t, ln3, globalAccountName, "foo", time.Second)

lo1 := DefaultOptions()
lo1.LeafNode.ReconnectInterval = 50 * time.Millisecond
lo1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{{Scheme: "nats", Host: fmt.Sprintf("127.0.0.1:%d", co1.LeafNode.Port)}}}}
ln1 := RunServer(lo1)
defer ln1.Shutdown()

checkLeafNodeConnected(t, ln1)
checkSubInterest(t, ln1, globalAccountName, "foo", time.Second)

sln3.Unsubscribe()
natsFlush(t, cln3)
sc2.Unsubscribe()
natsFlush(t, cc2)
sc1.Unsubscribe()
natsFlush(t, cc1)

cln1 := natsConnect(t, ln1.ClientURL())
defer cln1.Close()

natsPub(t, cln1, "foo", []byte("hello"))
natsNexMsg(t, sln2, time.Second)
}

0 comments on commit 52e3b4b

Please sign in to comment.