Skip to content

Commit

Permalink
[xds_client_federation_multi_controller] fed
Browse files Browse the repository at this point in the history
- added an authority struct
  - consists of controller (the xds stream) and an pubsub (cache for resources received from this authority)

- an authority is created when the first watch is called on this authority, and deleted when the last watch is canceled
  - authorities are ref-counted
  - authorities are kept in a `map[server-config]authority`, note that key is the config, not authority name, so that two authoritie with different names but same config will share the xds stream and pubsub
  - authority is kept in a cache after deletion, and only actually deleted after a timeout, so it can be revived if a new watch is started

- watch functions find (or build) the authority first, then call watch on this authority
  - they also handle ref/unref of the authority

- dump (for CSDS) is updated to dump resources from all the authorities

- resource handling (when unmarshalling receive xds resources) parses the resource name first, and then turns it back to a string, to canonicalize (especially the context parameters)

- tests
  - update handler is only available after the watch is started, so many tests need updating
  - make many LDS/RDS/CDS/EDS tests share code, and also added a federation version
  • Loading branch information
menghanl committed Dec 8, 2021
1 parent d35aff3 commit 026827e
Show file tree
Hide file tree
Showing 20 changed files with 1,572 additions and 1,706 deletions.
230 changes: 230 additions & 0 deletions xds/internal/xdsclient/authority.go
@@ -0,0 +1,230 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package xdsclient

import (
"errors"
"fmt"

"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/pubsub"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

const federationScheme = "xdstp"

// findAuthority returns the authority for this name. If it doesn't already
// exist, one will be created.
//
// Note that this doesn't always create new authority. authorities with the same
// config but different names are shared.
//
// Caller must not hold c.authorityMu.
func (c *clientImpl) findAuthority(n *xdsresource.Name) (retA *authority, _ error) {
scheme, authority := n.Scheme, n.Authority

c.authorityMu.Lock()
defer c.authorityMu.Unlock()
if c.done.HasFired() {
return nil, errors.New("the xds-client is closed")
}

defer func() {
// All returned authority from this function will be used by a watch,
// holding the ref here.
//
// Note that this must be done while c.authorityMu is held, to avoid the
// race that an authority is returned, but before the watch starts, the
// old last watch is canceled (in another goroutine), causing this
// authority to be removed, and then a watch will start on a removed
// authority.
//
// unref() will be done when the watch is canceled.
if retA != nil {
retA.ref()
}
}()

var config *bootstrap.ServerConfig
if scheme != federationScheme {
config = c.config.XDSServer
} else {
authConfig, ok := c.config.Authorities[authority]
if !ok {
return nil, fmt.Errorf("xds: failed to find authority %q", authority)
}
config = authConfig.XDSServer
}

a, err := c.newAuthority(config)
if err != nil {
return nil, fmt.Errorf("xds: failed to connect to the control plane for authority %q: %v", authority, err)
}
return a, nil
}

// newAuthority creates a new authority for the config. But before that, it
// checks the cache to see if an authority for this config already exists.
//
// caller must hold c.authorityMu
func (c *clientImpl) newAuthority(config *bootstrap.ServerConfig) (_ *authority, retErr error) {
// First check if there's already an authority for this config. If found, it
// means there was another watch with a different authority name but the
// same server config. Return it.
configStr := config.String()
if a, ok := c.authorityPerConfig[configStr]; ok {
return a, nil
}
// Second check if there's an authority in the idle cache. If found, it
// means this authority was created, but moved to the idle cache because the
// watch was canceled. Move it from idle cache to the authority cache, and
// return.
if old, ok := c.idleAuthorityPerConfig.Remove(configStr); ok {
oldA, _ := old.(*authority)
if oldA != nil {
c.authorityPerConfig[configStr] = oldA
return oldA, nil
}
}

// Make a new authority since there's no existing authority for this config.
ret := &authority{config: config, pubsub: pubsub.New(c.watchExpiryTimeout, c.logger)}
defer func() {
if retErr != nil {
ret.close()
}
}()
ctr, err := newController(config, ret.pubsub, c.updateValidator, c.logger)
if err != nil {
return nil, err
}
ret.controller = ctr
// Add it to the cache, so it will be reused.
c.authorityPerConfig[configStr] = ret
return ret, nil
}

// unrefAuthority unrefs the authority. It also moves the authority to idle
// cache if it's ref count is 0.
//
// Caller must not hold c.authorityMu.
func (c *clientImpl) unrefAuthority(a *authority) {
c.authorityMu.Lock()
defer c.authorityMu.Unlock()
if a.unref() == 0 {
fmt.Println(" --- adding auth to idle cache")
configStr := a.config.String()
delete(c.authorityPerConfig, configStr)
c.idleAuthorityPerConfig.Add(configStr, a, func() {
a.close()
})
}
}

// authority is a combination of pubsub and the controller for this authority.
//
// Note that it might make sense to use one pubsub for all the resources (for
// all the controllers). One downside is the handling of StoW APIs (LDS/CDS).
// These responses contain all the resources from that control plane, so pubsub
// will need to keep lists of resources from each control plane, to know what
// are removed.
type authority struct {
config *bootstrap.ServerConfig

pubsub *pubsub.Pubsub
controller controllerInterface

refCount int
}

// caller must hold parent's authorityMu.
func (a *authority) ref() {
a.refCount++
}

// caller must hold parent's authorityMu.
func (a *authority) unref() int {
a.refCount--
return a.refCount
}

func (a *authority) close() {
if a.pubsub != nil {
a.pubsub.Close()
}
if a.controller != nil {
a.controller.Close()
}
}

func (a *authority) watchListener(serviceName string, cb func(xdsresource.ListenerUpdate, error)) (cancel func()) {
first, cancelF := a.pubsub.WatchListener(serviceName, cb)
if first {
a.controller.AddWatch(xdsresource.ListenerResource, serviceName)
}
return func() {
if cancelF() {
a.controller.RemoveWatch(xdsresource.ListenerResource, serviceName)
}
}
}

func (a *authority) watchRouteConfig(routeName string, cb func(xdsresource.RouteConfigUpdate, error)) (cancel func()) {
first, cancelF := a.pubsub.WatchRouteConfig(routeName, cb)
if first {
a.controller.AddWatch(xdsresource.RouteConfigResource, routeName)
}
return func() {
if cancelF() {
a.controller.RemoveWatch(xdsresource.RouteConfigResource, routeName)
}
}
}

func (a *authority) watchCluster(clusterName string, cb func(xdsresource.ClusterUpdate, error)) (cancel func()) {
first, cancelF := a.pubsub.WatchCluster(clusterName, cb)
if first {
a.controller.AddWatch(xdsresource.ClusterResource, clusterName)
}
return func() {
if cancelF() {
a.controller.RemoveWatch(xdsresource.ClusterResource, clusterName)
}
}
}

func (a *authority) watchEndpoints(clusterName string, cb func(xdsresource.EndpointsUpdate, error)) (cancel func()) {
first, cancelF := a.pubsub.WatchEndpoints(clusterName, cb)
if first {
a.controller.AddWatch(xdsresource.EndpointsResource, clusterName)
}
return func() {
if cancelF() {
a.controller.RemoveWatch(xdsresource.EndpointsResource, clusterName)
}
}
}

func (a *authority) reportLoad(server string) (*load.Store, func()) {
return a.controller.ReportLoad(server)
}

func (a *authority) dump(t xdsresource.ResourceType) map[string]xdsresource.UpdateWithMD {
return a.pubsub.Dump(t)
}

0 comments on commit 026827e

Please sign in to comment.