diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler.go b/xds/internal/balancer/cdsbalancer/cluster_handler.go index a10d8d772f2..234511a45dc 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler.go @@ -24,7 +24,12 @@ import ( "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) -var errNotReceivedUpdate = errors.New("tried to construct a cluster update on a cluster that has not received an update") +const maxDepth = 16 + +var ( + errNotReceivedUpdate = errors.New("tried to construct a cluster update on a cluster that has not received an update") + errExceedsMaxDepth = errors.New("aggregate cluster graph exceeds max depth") +) // clusterHandlerUpdate wraps the information received from the registered CDS // watcher. A non-nil error is propagated to the underlying cluster_resolver @@ -54,9 +59,10 @@ type clusterHandler struct { // A mutex to protect entire tree of clusters. clusterMutex sync.Mutex - root *clusterNode rootClusterName string + createdClusters map[string]*clusterNode + // A way to ping CDS Balancer about any updates or errors to a Node in the // tree. This will either get called from this handler constructing an // update or from a child with an error. Capacity of one as the only update @@ -66,39 +72,48 @@ type clusterHandler struct { func newClusterHandler(parent *cdsBalancer) *clusterHandler { return &clusterHandler{ - parent: parent, - updateChannel: make(chan clusterHandlerUpdate, 1), + parent: parent, + updateChannel: make(chan clusterHandlerUpdate, 1), + createdClusters: make(map[string]*clusterNode), } } func (ch *clusterHandler) updateRootCluster(rootClusterName string) { ch.clusterMutex.Lock() defer ch.clusterMutex.Unlock() - if ch.root == nil { + if ch.createdClusters[ch.rootClusterName] == nil { // Construct a root node on first update. - ch.root = createClusterNode(rootClusterName, ch.parent.xdsClient, ch) + createClusterNode(rootClusterName, ch.parent.xdsClient, ch, 0) ch.rootClusterName = rootClusterName return } // Check if root cluster was changed. If it was, delete old one and start // new one, if not do nothing. if rootClusterName != ch.rootClusterName { - ch.root.delete() - ch.root = createClusterNode(rootClusterName, ch.parent.xdsClient, ch) + ch.createdClusters[ch.rootClusterName].delete() + createClusterNode(rootClusterName, ch.parent.xdsClient, ch, 0) ch.rootClusterName = rootClusterName } } // This function tries to construct a cluster update to send to CDS. func (ch *clusterHandler) constructClusterUpdate() { - if ch.root == nil { + if ch.createdClusters[ch.rootClusterName] == nil { // If root is nil, this handler is closed, ignore the update. return } - clusterUpdate, err := ch.root.constructClusterUpdate() + clusterUpdate, err := ch.createdClusters[ch.rootClusterName].constructClusterUpdate(make(map[string]bool)) if err != nil { - // If there was an error received no op, as this simply means one of the - // children hasn't received an update yet. + // If there was an error received no op, as this can mean one of the + // children hasn't received an update yet, or the graph continued to + // stay in an error state. If the graph continues to stay in an error + // state, no new error needs to be written to the update buffer as that + // would be redundant information. + return + } + if clusterUpdate == nil { + // This means that there was an aggregated cluster with no EDS or DNS as + // leaf nodes. No update to be written. return } // For a ClusterUpdate, the only update CDS cares about is the most @@ -109,8 +124,8 @@ func (ch *clusterHandler) constructClusterUpdate() { default: } ch.updateChannel <- clusterHandlerUpdate{ - securityCfg: ch.root.clusterUpdate.SecurityCfg, - lbPolicy: ch.root.clusterUpdate.LBPolicy, + securityCfg: ch.createdClusters[ch.rootClusterName].clusterUpdate.SecurityCfg, + lbPolicy: ch.createdClusters[ch.rootClusterName].clusterUpdate.LBPolicy, updates: clusterUpdate, } } @@ -120,11 +135,10 @@ func (ch *clusterHandler) constructClusterUpdate() { func (ch *clusterHandler) close() { ch.clusterMutex.Lock() defer ch.clusterMutex.Unlock() - if ch.root == nil { + if ch.createdClusters[ch.rootClusterName] == nil { return } - ch.root.delete() - ch.root = nil + ch.createdClusters[ch.rootClusterName].delete() ch.rootClusterName = "" } @@ -136,7 +150,7 @@ type clusterNode struct { cancelFunc func() // A list of children, as the Node can be an aggregate Cluster. - children []*clusterNode + children []string // A ClusterUpdate in order to build a list of cluster updates for CDS to // send down to child XdsClusterResolverLoadBalancingPolicy. @@ -149,13 +163,30 @@ type clusterNode struct { receivedUpdate bool clusterHandler *clusterHandler + + depth int32 + refCount int32 + + // maxDepthErr is set if this cluster node is an aggregate cluster and has a + // child that causes the graph to exceed the maximum depth allowed. This is + // used to show a cluster graph as being in an error state when it constructs + // a cluster update. + maxDepthErr error } // CreateClusterNode creates a cluster node from a given clusterName. This will // also start the watch for that cluster. -func createClusterNode(clusterName string, xdsClient xdsclient.XDSClient, topLevelHandler *clusterHandler) *clusterNode { +func createClusterNode(clusterName string, xdsClient xdsclient.XDSClient, topLevelHandler *clusterHandler, depth int32) { + // If the cluster has already been created, simply return, which ignores + // duplicates. + if topLevelHandler.createdClusters[clusterName] != nil { + topLevelHandler.createdClusters[clusterName].refCount++ + return + } c := &clusterNode{ clusterHandler: topLevelHandler, + depth: depth, + refCount: 1, } // Communicate with the xds client here. topLevelHandler.parent.logger.Infof("CDS watch started on %v", clusterName) @@ -164,25 +195,43 @@ func createClusterNode(clusterName string, xdsClient xdsclient.XDSClient, topLev topLevelHandler.parent.logger.Infof("CDS watch canceled on %v", clusterName) cancel() } - return c + topLevelHandler.createdClusters[clusterName] = c } // This function cancels the cluster watch on the cluster and all of it's // children. func (c *clusterNode) delete() { - c.cancelFunc() - for _, child := range c.children { - child.delete() + c.refCount-- + if c.refCount == 0 { + c.cancelFunc() + delete(c.clusterHandler.createdClusters, c.clusterUpdate.ClusterName) + for _, child := range c.children { + if c.clusterHandler.createdClusters[child] != nil { + c.clusterHandler.createdClusters[child].delete() + } + } } } // Construct cluster update (potentially a list of ClusterUpdates) for a node. -func (c *clusterNode) constructClusterUpdate() ([]xdsresource.ClusterUpdate, error) { +func (c *clusterNode) constructClusterUpdate(clustersSeen map[string]bool) ([]xdsresource.ClusterUpdate, error) { // If the cluster has not yet received an update, the cluster update is not // yet ready. if !c.receivedUpdate { return nil, errNotReceivedUpdate } + if c.maxDepthErr != nil { + return nil, c.maxDepthErr + } + // Ignore duplicates. It's ok to ignore duplicates because the second + // occurrence of a cluster will never be used. I.e. in [C, D, C], the second + // C will never be used (the only way to fall back to lower priority D is if + // C is down, which means second C will never be chosen). Thus, [C, D, C] is + // logically equivalent to [C, D]. + if clustersSeen[c.clusterUpdate.ClusterName] { + return []xdsresource.ClusterUpdate{}, nil + } + clustersSeen[c.clusterUpdate.ClusterName] = true // Base case - LogicalDNS or EDS. Both of these cluster types will be tied // to a single ClusterUpdate. @@ -194,7 +243,7 @@ func (c *clusterNode) constructClusterUpdate() ([]xdsresource.ClusterUpdate, err // it's children. var childrenUpdates []xdsresource.ClusterUpdate for _, child := range c.children { - childUpdateList, err := child.constructClusterUpdate() + childUpdateList, err := c.clusterHandler.createdClusters[child].constructClusterUpdate(clustersSeen) if err != nil { return nil, err } @@ -219,6 +268,8 @@ func (c *clusterNode) handleResp(clusterUpdate xdsresource.ClusterUpdate, err er default: } c.clusterHandler.updateChannel <- clusterHandlerUpdate{err: err} + c.receivedUpdate = false + c.maxDepthErr = nil return } @@ -233,9 +284,10 @@ func (c *clusterNode) handleResp(clusterUpdate xdsresource.ClusterUpdate, err er // cluster. if clusterUpdate.ClusterType != xdsresource.ClusterTypeAggregate { for _, child := range c.children { - child.delete() + c.clusterHandler.createdClusters[child].delete() } c.children = nil + c.maxDepthErr = nil // This is an update in the one leaf node, should try to send an update // to the parent CDS balancer. // @@ -248,6 +300,22 @@ func (c *clusterNode) handleResp(clusterUpdate xdsresource.ClusterUpdate, err er } // Aggregate cluster handling. + if len(clusterUpdate.PrioritizedClusterNames) >= 1 { + if c.depth == maxDepth-1 { + // For a ClusterUpdate, the only update CDS cares about is the most + // recent one, so opportunistically drain the update channel before + // sending the new update. + select { + case <-c.clusterHandler.updateChannel: + default: + } + c.clusterHandler.updateChannel <- clusterHandlerUpdate{err: errExceedsMaxDepth} + c.children = []string{} + c.maxDepthErr = errExceedsMaxDepth + return + } + } + newChildren := make(map[string]bool) for _, childName := range clusterUpdate.PrioritizedClusterNames { newChildren[childName] = true @@ -261,59 +329,42 @@ func (c *clusterNode) handleResp(clusterUpdate xdsresource.ClusterUpdate, err er // the update to build (ex. if a child is created and a watch is started, // that child hasn't received an update yet due to the mutex lock on this // callback). - var createdChild, deletedChild bool + var createdChild bool // This map will represent the current children of the cluster. It will be // first added to in order to represent the new children. It will then have - // any children deleted that are no longer present. Then, from the cluster - // update received, will be used to construct the new child list. - mapCurrentChildren := make(map[string]*clusterNode) + // any children deleted that are no longer present. + mapCurrentChildren := make(map[string]bool) for _, child := range c.children { - mapCurrentChildren[child.clusterUpdate.ClusterName] = child + mapCurrentChildren[child] = true } // Add and construct any new child nodes. for child := range newChildren { if _, inChildrenAlready := mapCurrentChildren[child]; !inChildrenAlready { - createdChild = true - mapCurrentChildren[child] = createClusterNode(child, c.clusterHandler.parent.xdsClient, c.clusterHandler) + createClusterNode(child, c.clusterHandler.parent.xdsClient, c.clusterHandler, c.depth+1) } } // Delete any child nodes no longer in the aggregate cluster's children. for child := range mapCurrentChildren { if _, stillAChild := newChildren[child]; !stillAChild { - deletedChild = true - mapCurrentChildren[child].delete() + c.clusterHandler.createdClusters[child].delete() delete(mapCurrentChildren, child) } } - // The order of the children list matters, so use the clusterUpdate from - // xdsclient as the ordering, and use that logical ordering for the new - // children list. This will be a mixture of child nodes which are all - // already constructed in the mapCurrentChildrenMap. - var children = make([]*clusterNode, 0, len(clusterUpdate.PrioritizedClusterNames)) - - for _, orderedChild := range clusterUpdate.PrioritizedClusterNames { - // The cluster's already have watches started for them in xds client, so - // you can use these pointers to construct the new children list, you - // just have to put them in the correct order using the original cluster - // update. - currentChild := mapCurrentChildren[orderedChild] - children = append(children, currentChild) - } - - c.children = children + c.children = clusterUpdate.PrioritizedClusterNames + c.maxDepthErr = nil // If the cluster is an aggregate cluster, if this callback created any new // child cluster nodes, then there's no possibility for a full cluster // update to successfully build, as those created children will not have - // received an update yet. However, if there was simply a child deleted, - // then there is a possibility that it will have a full cluster update to - // build and also will have a changed overall cluster update from the - // deleted child. - if deletedChild && !createdChild { + // received an update yet. Even if this update did not delete a child, there + // is still a possibility for the cluster update to build, as the aggregate + // cluster can ignore duplicated children and thus the update can fill out + // the full cluster update tree. + if !createdChild { c.clusterHandler.constructClusterUpdate() } } diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go index b58f06d0eb2..caf10955014 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler_test.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler_test.go @@ -19,6 +19,7 @@ package cdsbalancer import ( "context" "errors" + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -683,3 +684,426 @@ func (s) TestSwitchClusterNodeBetweenLeafAndAggregated(t *testing.T) { t.Fatal("Timed out waiting for update from update channel.") } } + +// TestExceedsMaxStackDepth tests the scenario where an aggregate cluster +// exceeds the maximum depth, which is 16. This should cause an error to be +// written to the update buffer. +func (s) TestExceedsMaxStackDepth(t *testing.T) { + ch, fakeClient := setupTests() + ch.updateRootCluster("cluster0") + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + _, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + + for i := 0; i <= 15; i++ { + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: "cluster" + fmt.Sprint(i), + PrioritizedClusterNames: []string{"cluster" + fmt.Sprint(i+1)}, + }, nil) + if i == 15 { + // The 16th iteration will try and create a cluster which exceeds + // max stack depth and will thus error, so no CDS Watch will be + // started for the child. + continue + } + _, err = fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + } + select { + case chu := <-ch.updateChannel: + if chu.err.Error() != "aggregate cluster graph exceeds max depth" { + t.Fatalf("Did not receive the expected error, instead received: %v", chu.err.Error()) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for an error to be written to update channel.") + } +} + +// TestDiamondDependency tests a diamond shaped aggregate cluster (A->[B,C]; +// B->D; C->D). Due to both B and C pointing to D as it's child, it should be +// ignored for C. Once all 4 clusters have received a CDS update, an update +// should be then written to the update buffer, specifying a single Cluster D. +func (s) TestDiamondDependency(t *testing.T) { + ch, fakeClient := setupTests() + ch.updateRootCluster("clusterA") + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + _, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: "clusterA", + PrioritizedClusterNames: []string{"clusterB", "clusterC"}, + }, nil) + // Two watches should be started for both child clusters. + _, err = fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + _, err = fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + // B -> D. + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: "clusterB", + PrioritizedClusterNames: []string{"clusterD"}, + }, nil) + _, err = fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + + // This shouldn't cause an update to be written to the update buffer, + // as cluster C has not received a cluster update yet. + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeEDS, + ClusterName: "clusterD", + }, nil) + + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + select { + case <-ch.updateChannel: + t.Fatal("an update should not have been written to the update buffer") + case <-sCtx.Done(): + } + + // This update for C should cause an update to be written to the update + // buffer. When you search this aggregated cluster graph, each node has + // received an update. This update should only contain one clusterD, as + // clusterC does not add a clusterD child update due to the clusterD update + // already having been added as a child of clusterB. + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: "clusterC", + PrioritizedClusterNames: []string{"clusterD"}, + }, nil) + + select { + case chu := <-ch.updateChannel: + if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{ + ClusterType: xdsresource.ClusterTypeEDS, + ClusterName: "clusterD", + }}); diff != "" { + t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + } +} + +// TestIgnoreDups tests the cluster (A->[B, C]; B->[C, D]). Only one watch +// should be started for cluster C. The update written to the update buffer +// should only contain one instance of cluster C correctly as a higher priority +// than D. +func (s) TestIgnoreDups(t *testing.T) { + ch, fakeClient := setupTests() + ch.updateRootCluster("clusterA") + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + _, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: "clusterA", + PrioritizedClusterNames: []string{"clusterB", "clusterC"}, + }, nil) + // Two watches should be started, one for each child cluster. + _, err = fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + _, err = fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + // The child cluster C should not have a watch started for it, as it is + // already part of the aggregate cluster graph as the child of the root + // cluster clusterA and has already had a watch started for it. + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: "clusterB", + PrioritizedClusterNames: []string{"clusterC", "clusterD"}, + }, nil) + // Only one watch should be started, which should be for clusterD. + name, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if name != "clusterD" { + t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: clusterD", name) + } + + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err = fakeClient.WaitForWatchCluster(sCtx); err == nil { + t.Fatalf("only one watch should have been started for the children of clusterB") + } + + // This update should not cause an update to be written to the update + // buffer, as each cluster in the tree has not yet received a cluster + // update. With cluster B ignoring cluster C, the system should function as + // if cluster C was not a child of cluster B (meaning all 4 clusters should + // be required to get an update). + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeEDS, + ClusterName: "clusterC", + }, nil) + sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + select { + case <-ch.updateChannel: + t.Fatal("an update should not have been written to the update buffer") + case <-sCtx.Done(): + } + + // This update causes all 4 clusters in the aggregated cluster graph to have + // received an update, so an update should be written to the update buffer + // with only a single occurrence of cluster C as a higher priority than + // cluster D. + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeEDS, + ClusterName: "clusterD", + }, nil) + select { + case chu := <-ch.updateChannel: + if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{ + ClusterType: xdsresource.ClusterTypeEDS, + ClusterName: "clusterC", + }, { + ClusterType: xdsresource.ClusterTypeEDS, + ClusterName: "clusterD", + }}); diff != "" { + t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + } + + // Delete A's ref to C by updating A with only child B. Since B still has a + // reference to C, C's watch should not be canceled, and also an update + // should correctly be built. + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: "clusterA", + PrioritizedClusterNames: []string{"clusterB"}, + }, nil) + + select { + case chu := <-ch.updateChannel: + if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{ + ClusterType: xdsresource.ClusterTypeEDS, + ClusterName: "clusterC", + }, { + ClusterType: xdsresource.ClusterTypeEDS, + ClusterName: "clusterD", + }}); diff != "" { + t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + } +} + +// TestErrorStateWholeTree tests the scenario where the aggregate cluster graph +// exceeds max depth. An error should be written to the update channel. +// Afterward, if a valid response comes in for another cluster, no update should +// be written to the update channel, as the aggregate cluster graph is still in +// the same error state. +func (s) TestErrorStateWholeTree(t *testing.T) { + ch, fakeClient := setupTests() + ch.updateRootCluster("cluster0") + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + _, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + + for i := 0; i <= 15; i++ { + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: "cluster" + fmt.Sprint(i), + PrioritizedClusterNames: []string{"cluster" + fmt.Sprint(i+1)}, + }, nil) + if i == 15 { + // The 16th iteration will try and create a cluster which exceeds + // max stack depth and will thus error, so no CDS Watch will be + // started for the child. + continue + } + _, err = fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + } + select { + case chu := <-ch.updateChannel: + if chu.err.Error() != "aggregate cluster graph exceeds max depth" { + t.Fatalf("Did not receive the expected error, instead received: %v", chu.err.Error()) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for an error to be written to update channel.") + } + + // Invoke a cluster callback for a node in the graph that rests within the + // allowed depth. This will cause the system to try and construct a cluster + // update, and it shouldn't write an update as the aggregate cluster graph + // is still in an error state. Since the graph continues to stay in an error + // state, no new error needs to be written to the update buffer as that + // would be redundant information. + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: "cluster3", + PrioritizedClusterNames: []string{"cluster4"}, + }, nil) + + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + select { + case <-ch.updateChannel: + t.Fatal("an update should not have been written to the update buffer") + case <-sCtx.Done(): + } + + // Invoke the same cluster update for cluster 15, specifying it has a child + // cluster16. This should cause an error to be written to the update buffer, + // as it still exceeds the max depth. + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: "cluster15", + PrioritizedClusterNames: []string{"cluster16"}, + }, nil) + select { + case chu := <-ch.updateChannel: + if chu.err.Error() != "aggregate cluster graph exceeds max depth" { + t.Fatalf("Did not receive the expected error, instead received: %v", chu.err.Error()) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for an error to be written to update channel.") + } + + // When you remove the child of cluster15 that causes the graph to be in the + // error state of exceeding max depth, the update should successfully + // construct and be written to the update buffer. + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeEDS, + ClusterName: "cluster15", + }, nil) + + select { + case chu := <-ch.updateChannel: + if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{{ + ClusterType: xdsresource.ClusterTypeEDS, + ClusterName: "cluster15", + }}); diff != "" { + t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) + } + case <-ctx.Done(): + t.Fatal("Timed out waiting for the cluster update to be written to the update buffer.") + } +} + +// TestNodeChildOfItself tests the scenario where the aggregate cluster graph +// has a node that has child node of itself. The case for this is A -> A, and +// since there is no base cluster (EDS or Logical DNS), no update should be +// written if it tries to build a cluster update. +func (s) TestNodeChildOfItself(t *testing.T) { + ch, fakeClient := setupTests() + ch.updateRootCluster("clusterA") + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + _, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + // Invoke the callback informing the cluster handler that clusterA has a + // child that it is itself. Due to this child cluster being a duplicate, no + // watch should be started. Since there are no leaf nodes (i.e. EDS or + // Logical DNS), no update should be written to the update buffer. + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: "clusterA", + PrioritizedClusterNames: []string{"clusterA"}, + }, nil) + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err := fakeClient.WaitForWatchCluster(sCtx); err == nil { + t.Fatal("Watch should not have been started for clusterA") + } + sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + select { + case <-ch.updateChannel: + t.Fatal("update should not have been written to update buffer") + case <-sCtx.Done(): + } + + // Invoke the callback again informing the cluster handler that clusterA has + // a child that it is itself. Due to this child cluster being a duplicate, + // no watch should be started. Since there are no leaf nodes (i.e. EDS or + // Logical DNS), no update should be written to the update buffer. + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: "clusterA", + PrioritizedClusterNames: []string{"clusterA"}, + }, nil) + + sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err := fakeClient.WaitForWatchCluster(sCtx); err == nil { + t.Fatal("Watch should not have been started for clusterA") + } + sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + select { + case <-ch.updateChannel: + t.Fatal("update should not have been written to update buffer, as clusterB has not received an update yet") + case <-sCtx.Done(): + } + + // Inform the cluster handler that clusterA now has clusterB as a child. + // This should not cancel the watch for A, as it is still the root cluster + // and still has a ref count, not write an update to update buffer as + // cluster B has not received an update yet, and start a new watch for + // cluster B as it is not a duplicate. + fakeClient.InvokeWatchClusterCallback(xdsresource.ClusterUpdate{ + ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: "clusterA", + PrioritizedClusterNames: []string{"clusterB"}, + }, nil) + + sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err := fakeClient.WaitForCancelClusterWatch(sCtx); err == nil { + t.Fatal("clusterA should not have been canceled, as it is still the root cluster") + } + + sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + select { + case <-ch.updateChannel: + t.Fatal("update should not have been written to update buffer, as clusterB has not received an update yet") + case <-sCtx.Done(): + } + + gotCluster, err := fakeClient.WaitForWatchCluster(ctx) + if err != nil { + t.Fatalf("xdsClient.WatchCDS failed with error: %v", err) + } + if gotCluster != "clusterB" { + t.Fatalf("xdsClient.WatchCDS called for cluster: %v, want: %v", gotCluster, "clusterB") + } +}