Skip to content

Commit

Permalink
xds: Handle loops and ignore duplicates in aggregated cluster handling (
Browse files Browse the repository at this point in the history
#5317)

xds: Handle loops and ignore duplicates in aggregated cluster handling
  • Loading branch information
zasweq committed May 5, 2022
1 parent 799605c commit ee67b3d
Show file tree
Hide file tree
Showing 2 changed files with 531 additions and 56 deletions.
163 changes: 107 additions & 56 deletions xds/internal/balancer/cdsbalancer/cluster_handler.go
Expand Up @@ -24,7 +24,12 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

var errNotReceivedUpdate = errors.New("tried to construct a cluster update on a cluster that has not received an update")
const maxDepth = 16

var (
errNotReceivedUpdate = errors.New("tried to construct a cluster update on a cluster that has not received an update")
errExceedsMaxDepth = errors.New("aggregate cluster graph exceeds max depth")
)

// clusterHandlerUpdate wraps the information received from the registered CDS
// watcher. A non-nil error is propagated to the underlying cluster_resolver
Expand Down Expand Up @@ -54,9 +59,10 @@ type clusterHandler struct {

// A mutex to protect entire tree of clusters.
clusterMutex sync.Mutex
root *clusterNode
rootClusterName string

createdClusters map[string]*clusterNode

// A way to ping CDS Balancer about any updates or errors to a Node in the
// tree. This will either get called from this handler constructing an
// update or from a child with an error. Capacity of one as the only update
Expand All @@ -66,39 +72,48 @@ type clusterHandler struct {

func newClusterHandler(parent *cdsBalancer) *clusterHandler {
return &clusterHandler{
parent: parent,
updateChannel: make(chan clusterHandlerUpdate, 1),
parent: parent,
updateChannel: make(chan clusterHandlerUpdate, 1),
createdClusters: make(map[string]*clusterNode),
}
}

func (ch *clusterHandler) updateRootCluster(rootClusterName string) {
ch.clusterMutex.Lock()
defer ch.clusterMutex.Unlock()
if ch.root == nil {
if ch.createdClusters[ch.rootClusterName] == nil {
// Construct a root node on first update.
ch.root = createClusterNode(rootClusterName, ch.parent.xdsClient, ch)
createClusterNode(rootClusterName, ch.parent.xdsClient, ch, 0)
ch.rootClusterName = rootClusterName
return
}
// Check if root cluster was changed. If it was, delete old one and start
// new one, if not do nothing.
if rootClusterName != ch.rootClusterName {
ch.root.delete()
ch.root = createClusterNode(rootClusterName, ch.parent.xdsClient, ch)
ch.createdClusters[ch.rootClusterName].delete()
createClusterNode(rootClusterName, ch.parent.xdsClient, ch, 0)
ch.rootClusterName = rootClusterName
}
}

// This function tries to construct a cluster update to send to CDS.
func (ch *clusterHandler) constructClusterUpdate() {
if ch.root == nil {
if ch.createdClusters[ch.rootClusterName] == nil {
// If root is nil, this handler is closed, ignore the update.
return
}
clusterUpdate, err := ch.root.constructClusterUpdate()
clusterUpdate, err := ch.createdClusters[ch.rootClusterName].constructClusterUpdate(make(map[string]bool))
if err != nil {
// If there was an error received no op, as this simply means one of the
// children hasn't received an update yet.
// If there was an error received no op, as this can mean one of the
// children hasn't received an update yet, or the graph continued to
// stay in an error state. If the graph continues to stay in an error
// state, no new error needs to be written to the update buffer as that
// would be redundant information.
return
}
if clusterUpdate == nil {
// This means that there was an aggregated cluster with no EDS or DNS as
// leaf nodes. No update to be written.
return
}
// For a ClusterUpdate, the only update CDS cares about is the most
Expand All @@ -109,8 +124,8 @@ func (ch *clusterHandler) constructClusterUpdate() {
default:
}
ch.updateChannel <- clusterHandlerUpdate{
securityCfg: ch.root.clusterUpdate.SecurityCfg,
lbPolicy: ch.root.clusterUpdate.LBPolicy,
securityCfg: ch.createdClusters[ch.rootClusterName].clusterUpdate.SecurityCfg,
lbPolicy: ch.createdClusters[ch.rootClusterName].clusterUpdate.LBPolicy,
updates: clusterUpdate,
}
}
Expand All @@ -120,11 +135,10 @@ func (ch *clusterHandler) constructClusterUpdate() {
func (ch *clusterHandler) close() {
ch.clusterMutex.Lock()
defer ch.clusterMutex.Unlock()
if ch.root == nil {
if ch.createdClusters[ch.rootClusterName] == nil {
return
}
ch.root.delete()
ch.root = nil
ch.createdClusters[ch.rootClusterName].delete()
ch.rootClusterName = ""
}

Expand All @@ -136,7 +150,7 @@ type clusterNode struct {
cancelFunc func()

// A list of children, as the Node can be an aggregate Cluster.
children []*clusterNode
children []string

// A ClusterUpdate in order to build a list of cluster updates for CDS to
// send down to child XdsClusterResolverLoadBalancingPolicy.
Expand All @@ -149,13 +163,30 @@ type clusterNode struct {
receivedUpdate bool

clusterHandler *clusterHandler

depth int32
refCount int32

// maxDepthErr is set if this cluster node is an aggregate cluster and has a
// child that causes the graph to exceed the maximum depth allowed. This is
// used to show a cluster graph as being in an error state when it constructs
// a cluster update.
maxDepthErr error
}

// CreateClusterNode creates a cluster node from a given clusterName. This will
// also start the watch for that cluster.
func createClusterNode(clusterName string, xdsClient xdsclient.XDSClient, topLevelHandler *clusterHandler) *clusterNode {
func createClusterNode(clusterName string, xdsClient xdsclient.XDSClient, topLevelHandler *clusterHandler, depth int32) {
// If the cluster has already been created, simply return, which ignores
// duplicates.
if topLevelHandler.createdClusters[clusterName] != nil {
topLevelHandler.createdClusters[clusterName].refCount++
return
}
c := &clusterNode{
clusterHandler: topLevelHandler,
depth: depth,
refCount: 1,
}
// Communicate with the xds client here.
topLevelHandler.parent.logger.Infof("CDS watch started on %v", clusterName)
Expand All @@ -164,25 +195,43 @@ func createClusterNode(clusterName string, xdsClient xdsclient.XDSClient, topLev
topLevelHandler.parent.logger.Infof("CDS watch canceled on %v", clusterName)
cancel()
}
return c
topLevelHandler.createdClusters[clusterName] = c
}

// This function cancels the cluster watch on the cluster and all of it's
// children.
func (c *clusterNode) delete() {
c.cancelFunc()
for _, child := range c.children {
child.delete()
c.refCount--
if c.refCount == 0 {
c.cancelFunc()
delete(c.clusterHandler.createdClusters, c.clusterUpdate.ClusterName)
for _, child := range c.children {
if c.clusterHandler.createdClusters[child] != nil {
c.clusterHandler.createdClusters[child].delete()
}
}
}
}

// Construct cluster update (potentially a list of ClusterUpdates) for a node.
func (c *clusterNode) constructClusterUpdate() ([]xdsresource.ClusterUpdate, error) {
func (c *clusterNode) constructClusterUpdate(clustersSeen map[string]bool) ([]xdsresource.ClusterUpdate, error) {
// If the cluster has not yet received an update, the cluster update is not
// yet ready.
if !c.receivedUpdate {
return nil, errNotReceivedUpdate
}
if c.maxDepthErr != nil {
return nil, c.maxDepthErr
}
// Ignore duplicates. It's ok to ignore duplicates because the second
// occurrence of a cluster will never be used. I.e. in [C, D, C], the second
// C will never be used (the only way to fall back to lower priority D is if
// C is down, which means second C will never be chosen). Thus, [C, D, C] is
// logically equivalent to [C, D].
if clustersSeen[c.clusterUpdate.ClusterName] {
return []xdsresource.ClusterUpdate{}, nil
}
clustersSeen[c.clusterUpdate.ClusterName] = true

// Base case - LogicalDNS or EDS. Both of these cluster types will be tied
// to a single ClusterUpdate.
Expand All @@ -194,7 +243,7 @@ func (c *clusterNode) constructClusterUpdate() ([]xdsresource.ClusterUpdate, err
// it's children.
var childrenUpdates []xdsresource.ClusterUpdate
for _, child := range c.children {
childUpdateList, err := child.constructClusterUpdate()
childUpdateList, err := c.clusterHandler.createdClusters[child].constructClusterUpdate(clustersSeen)
if err != nil {
return nil, err
}
Expand All @@ -219,6 +268,8 @@ func (c *clusterNode) handleResp(clusterUpdate xdsresource.ClusterUpdate, err er
default:
}
c.clusterHandler.updateChannel <- clusterHandlerUpdate{err: err}
c.receivedUpdate = false
c.maxDepthErr = nil
return
}

Expand All @@ -233,9 +284,10 @@ func (c *clusterNode) handleResp(clusterUpdate xdsresource.ClusterUpdate, err er
// cluster.
if clusterUpdate.ClusterType != xdsresource.ClusterTypeAggregate {
for _, child := range c.children {
child.delete()
c.clusterHandler.createdClusters[child].delete()
}
c.children = nil
c.maxDepthErr = nil
// This is an update in the one leaf node, should try to send an update
// to the parent CDS balancer.
//
Expand All @@ -248,6 +300,22 @@ func (c *clusterNode) handleResp(clusterUpdate xdsresource.ClusterUpdate, err er
}

// Aggregate cluster handling.
if len(clusterUpdate.PrioritizedClusterNames) >= 1 {
if c.depth == maxDepth-1 {
// For a ClusterUpdate, the only update CDS cares about is the most
// recent one, so opportunistically drain the update channel before
// sending the new update.
select {
case <-c.clusterHandler.updateChannel:
default:
}
c.clusterHandler.updateChannel <- clusterHandlerUpdate{err: errExceedsMaxDepth}
c.children = []string{}
c.maxDepthErr = errExceedsMaxDepth
return
}
}

newChildren := make(map[string]bool)
for _, childName := range clusterUpdate.PrioritizedClusterNames {
newChildren[childName] = true
Expand All @@ -261,59 +329,42 @@ func (c *clusterNode) handleResp(clusterUpdate xdsresource.ClusterUpdate, err er
// the update to build (ex. if a child is created and a watch is started,
// that child hasn't received an update yet due to the mutex lock on this
// callback).
var createdChild, deletedChild bool
var createdChild bool

// This map will represent the current children of the cluster. It will be
// first added to in order to represent the new children. It will then have
// any children deleted that are no longer present. Then, from the cluster
// update received, will be used to construct the new child list.
mapCurrentChildren := make(map[string]*clusterNode)
// any children deleted that are no longer present.
mapCurrentChildren := make(map[string]bool)
for _, child := range c.children {
mapCurrentChildren[child.clusterUpdate.ClusterName] = child
mapCurrentChildren[child] = true
}

// Add and construct any new child nodes.
for child := range newChildren {
if _, inChildrenAlready := mapCurrentChildren[child]; !inChildrenAlready {
createdChild = true
mapCurrentChildren[child] = createClusterNode(child, c.clusterHandler.parent.xdsClient, c.clusterHandler)
createClusterNode(child, c.clusterHandler.parent.xdsClient, c.clusterHandler, c.depth+1)
}
}

// Delete any child nodes no longer in the aggregate cluster's children.
for child := range mapCurrentChildren {
if _, stillAChild := newChildren[child]; !stillAChild {
deletedChild = true
mapCurrentChildren[child].delete()
c.clusterHandler.createdClusters[child].delete()
delete(mapCurrentChildren, child)
}
}

// The order of the children list matters, so use the clusterUpdate from
// xdsclient as the ordering, and use that logical ordering for the new
// children list. This will be a mixture of child nodes which are all
// already constructed in the mapCurrentChildrenMap.
var children = make([]*clusterNode, 0, len(clusterUpdate.PrioritizedClusterNames))

for _, orderedChild := range clusterUpdate.PrioritizedClusterNames {
// The cluster's already have watches started for them in xds client, so
// you can use these pointers to construct the new children list, you
// just have to put them in the correct order using the original cluster
// update.
currentChild := mapCurrentChildren[orderedChild]
children = append(children, currentChild)
}

c.children = children
c.children = clusterUpdate.PrioritizedClusterNames

c.maxDepthErr = nil
// If the cluster is an aggregate cluster, if this callback created any new
// child cluster nodes, then there's no possibility for a full cluster
// update to successfully build, as those created children will not have
// received an update yet. However, if there was simply a child deleted,
// then there is a possibility that it will have a full cluster update to
// build and also will have a changed overall cluster update from the
// deleted child.
if deletedChild && !createdChild {
// received an update yet. Even if this update did not delete a child, there
// is still a possibility for the cluster update to build, as the aggregate
// cluster can ignore duplicated children and thus the update can fill out
// the full cluster update tree.
if !createdChild {
c.clusterHandler.constructClusterUpdate()
}
}

0 comments on commit ee67b3d

Please sign in to comment.