Skip to content

Commit

Permalink
xds: export XDSClient interface and use it in balancer tests (#4510)
Browse files Browse the repository at this point in the history
- xdsclient.New returns the interface now
- xdsclient.SetClient and xdsclient.FromResolverState take and return the interface now
- cleanup xds balancer tests to pass xds_client in resolver state
  • Loading branch information
menghanl committed Jun 8, 2021
1 parent 7301a31 commit b1418a6
Show file tree
Hide file tree
Showing 26 changed files with 173 additions and 304 deletions.
20 changes: 2 additions & 18 deletions xds/csds/csds.go
Expand Up @@ -38,33 +38,17 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/types/known/timestamppb"

_ "google.golang.org/grpc/xds/internal/xdsclient/v2" // Register v2 xds_client.
_ "google.golang.org/grpc/xds/internal/xdsclient/v3" // Register v3 xds_client.
)

// xdsClient contains methods from xdsClient.Client which are used by
// the server. This is useful for overriding in unit tests.
type xdsClient interface {
DumpLDS() (string, map[string]xdsclient.UpdateWithMD)
DumpRDS() (string, map[string]xdsclient.UpdateWithMD)
DumpCDS() (string, map[string]xdsclient.UpdateWithMD)
DumpEDS() (string, map[string]xdsclient.UpdateWithMD)
BootstrapConfig() *bootstrap.Config
Close()
}

var (
logger = grpclog.Component("xds")
newXDSClient = func() xdsClient {
newXDSClient = func() xdsclient.XDSClient {
c, err := xdsclient.New()
if err != nil {
// If err is not nil, c is a typed nil (of type *xdsclient.Client).
// If c is returned and assigned to the xdsClient field in the CSDS
// server, the nil checks in the handlers will not handle it
// properly.
logger.Warningf("failed to create xds client: %v", err)
return nil
}
Expand All @@ -76,7 +60,7 @@ var (
type ClientStatusDiscoveryServer struct {
// xdsClient will always be the same in practice. But we keep a copy in each
// server instance for testing.
xdsClient xdsClient
xdsClient xdsclient.XDSClient
}

// NewClientStatusDiscoveryServer returns an implementation of the CSDS server that can be
Expand Down
13 changes: 3 additions & 10 deletions xds/csds/csds_test.go
Expand Up @@ -59,13 +59,6 @@ const (
defaultTestTimeout = 10 * time.Second
)

type xdsClientWithWatch interface {
WatchListener(string, func(xdsclient.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsclient.RouteConfigUpdate, error)) func()
WatchCluster(string, func(xdsclient.ClusterUpdate, error)) func()
WatchEndpoints(string, func(xdsclient.EndpointsUpdate, error)) func()
}

var cmpOpts = cmp.Options{
cmpopts.EquateEmpty(),
cmp.Comparer(func(a, b *timestamppb.Timestamp) bool { return true }),
Expand Down Expand Up @@ -250,7 +243,7 @@ func TestCSDS(t *testing.T) {
}
}

func commonSetup(t *testing.T) (xdsClientWithWatch, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) {
func commonSetup(t *testing.T) (xdsclient.XDSClient, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) {
t.Helper()

// Spin up a xDS management server on a local port.
Expand All @@ -275,7 +268,7 @@ func commonSetup(t *testing.T) (xdsClientWithWatch, *e2e.ManagementServer, strin
t.Fatalf("failed to create xds client: %v", err)
}
oldNewXDSClient := newXDSClient
newXDSClient = func() xdsClient { return xdsC }
newXDSClient = func() xdsclient.XDSClient { return xdsC }

// Initialize an gRPC server and register CSDS on it.
server := grpc.NewServer()
Expand Down Expand Up @@ -635,7 +628,7 @@ func protoToJSON(p proto.Message) string {

func TestCSDSNoXDSClient(t *testing.T) {
oldNewXDSClient := newXDSClient
newXDSClient = func() xdsClient { return nil }
newXDSClient = func() xdsclient.XDSClient { return nil }
defer func() { newXDSClient = oldNewXDSClient }()

// Initialize an gRPC server and register CSDS on it.
Expand Down
8 changes: 2 additions & 6 deletions xds/googledirectpath/googlec2p.go
Expand Up @@ -62,15 +62,11 @@ const (
dnsName, xdsName = "dns", "xds"
)

type xdsClient interface {
Close()
}

// For overriding in unittests.
var (
onGCE = googlecloud.OnGCE

newClientWithConfig = func(config *bootstrap.Config) (xdsClient, error) {
newClientWithConfig = func(config *bootstrap.Config) (xdsclient.XDSClient, error) {
return xdsclient.NewWithConfig(config)
}

Expand Down Expand Up @@ -139,7 +135,7 @@ func (c2pResolverBuilder) Scheme() string {

type c2pResolver struct {
resolver.Resolver
client xdsClient
client xdsclient.XDSClient
}

func (r *c2pResolver) Close() {
Expand Down
4 changes: 3 additions & 1 deletion xds/googledirectpath/googlec2p_test.go
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -130,6 +131,7 @@ func TestBuildNotOnGCE(t *testing.T) {
}

type testXDSClient struct {
xdsclient.XDSClient
closed chan struct{}
}

Expand Down Expand Up @@ -177,7 +179,7 @@ func TestBuildXDS(t *testing.T) {

configCh := make(chan *bootstrap.Config, 1)
oldNewClient := newClientWithConfig
newClientWithConfig = func(config *bootstrap.Config) (xdsClient, error) {
newClientWithConfig = func(config *bootstrap.Config) (xdsclient.XDSClient, error) {
configCh <- config
return tXDSClient, nil
}
Expand Down
35 changes: 2 additions & 33 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Expand Up @@ -36,7 +36,6 @@ import (
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
)

const (
Expand All @@ -59,7 +58,6 @@ var (
// not deal with subConns.
return builder.Build(cc, opts), nil
}
newXDSClient func() (xdsClient, error)
buildProvider = buildProviderFunc
)

Expand All @@ -84,17 +82,6 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal
}
b.logger = prefixLogger((b))
b.logger.Infof("Created")

if newXDSClient != nil {
// For tests
client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
}
b.xdsClient = client
}

var creds credentials.TransportCredentials
switch {
case opts.DialCreds != nil:
Expand Down Expand Up @@ -137,14 +124,6 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
return &cfg, nil
}

// xdsClient contains methods from xdsClient.Client which are used by
// the cdsBalancer. This will be faked out in unittests.
type xdsClient interface {
WatchCluster(string, func(xdsclient.ClusterUpdate, error)) func()
BootstrapConfig() *bootstrap.Config
Close()
}

// ccUpdate wraps a clientConn update received from gRPC (pushed from the
// xdsResolver). A valid clusterName causes the cdsBalancer to register a CDS
// watcher with the xdsClient, while a non-nil error causes it to cancel the
Expand Down Expand Up @@ -184,7 +163,7 @@ type cdsBalancer struct {
ccw *ccWrapper // ClientConn interface passed to child LB.
bOpts balancer.BuildOptions // BuildOptions passed to child LB.
updateCh *buffer.Unbounded // Channel for gRPC and xdsClient updates.
xdsClient xdsClient // xDS client to watch Cluster resource.
xdsClient xdsclient.XDSClient // xDS client to watch Cluster resource.
cancelWatch func() // Cluster watch cancel func.
edsLB balancer.Balancer // EDS child policy.
clusterToWatch string
Expand Down Expand Up @@ -361,15 +340,8 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) {
lbCfg.LrsLoadReportingServerName = new(string)

}
resolverState := resolver.State{}
// Include the xds client for the child LB policies to use. For unit
// tests, b.xdsClient may not be a full *xdsclient.Client, but it will
// always be in production.
if c, ok := b.xdsClient.(*xdsclient.Client); ok {
resolverState = xdsclient.SetClient(resolverState, c)
}
ccState := balancer.ClientConnState{
ResolverState: resolverState,
ResolverState: xdsclient.SetClient(resolver.State{}, b.xdsClient),
BalancerConfig: lbCfg,
}
if err := b.edsLB.UpdateClientConnState(ccState); err != nil {
Expand Down Expand Up @@ -407,9 +379,6 @@ func (b *cdsBalancer) run() {
b.edsLB.Close()
b.edsLB = nil
}
if newXDSClient != nil {
b.xdsClient.Close()
}
if b.cachedRoot != nil {
b.cachedRoot.Close()
}
Expand Down
Expand Up @@ -133,11 +133,7 @@ func (p *fakeProvider) Close() {
// xDSCredentials.
func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) {
t.Helper()

xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClient, error) { return xdsC, nil }

builder := balancer.Get(cdsName)
if builder == nil {
t.Fatalf("balancer.Get(%q) returned nil", cdsName)
Expand All @@ -164,7 +160,7 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
}

// Push a ClientConnState update to the CDS balancer with a cluster name.
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil {
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
}

Expand All @@ -181,8 +177,8 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
}

return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() {
newXDSClient = oldNewXDSClient
newEDSBalancer = oldEDSBalancerBuilder
xdsC.Close()
}
}

Expand Down
31 changes: 16 additions & 15 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
Expand Down Expand Up @@ -129,7 +128,10 @@ func (tb *testEDSBalancer) waitForClientConnUpdate(ctx context.Context, wantCCS
return err
}
gotCCS := ccs.(balancer.ClientConnState)
if !cmp.Equal(gotCCS, wantCCS, cmpopts.IgnoreUnexported(attributes.Attributes{})) {
if xdsclient.FromResolverState(gotCCS.ResolverState) == nil {
return fmt.Errorf("want resolver state with XDSClient attached, got one without")
}
if !cmp.Equal(gotCCS, wantCCS, cmpopts.IgnoreFields(resolver.State{}, "Attributes")) {
return fmt.Errorf("received ClientConnState: %+v, want %+v", gotCCS, wantCCS)
}
return nil
Expand Down Expand Up @@ -173,7 +175,7 @@ func (tb *testEDSBalancer) waitForClose(ctx context.Context) error {

// cdsCCS is a helper function to construct a good update passed from the
// xdsResolver to the cdsBalancer.
func cdsCCS(cluster string) balancer.ClientConnState {
func cdsCCS(cluster string, xdsC xdsclient.XDSClient) balancer.ClientConnState {
const cdsLBConfig = `{
"loadBalancingConfig":[
{
Expand All @@ -185,9 +187,9 @@ func cdsCCS(cluster string) balancer.ClientConnState {
}`
jsonSC := fmt.Sprintf(cdsLBConfig, cluster)
return balancer.ClientConnState{
ResolverState: resolver.State{
ResolverState: xdsclient.SetClient(resolver.State{
ServiceConfig: internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(jsonSC),
},
}, xdsC),
BalancerConfig: &lbConfig{ClusterName: clusterName},
}
}
Expand All @@ -211,11 +213,7 @@ func edsCCS(service string, countMax *uint32, enableLRS bool) balancer.ClientCon
// newEDSBalancer function to return it), and also returns a cleanup function.
func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) {
t.Helper()

xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClient, error) { return xdsC, nil }

builder := balancer.Get(cdsName)
if builder == nil {
t.Fatalf("balancer.Get(%q) returned nil", cdsName)
Expand All @@ -232,7 +230,7 @@ func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *x

return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() {
newEDSBalancer = oldEDSBalancerBuilder
newXDSClient = oldNewXDSClient
xdsC.Close()
}
}

Expand All @@ -242,7 +240,7 @@ func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBal
t.Helper()

xdsC, cdsB, edsB, tcc, cancel := setup(t)
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil {
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
}

Expand All @@ -262,6 +260,9 @@ func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBal
// cdsBalancer with different inputs and verifies that the CDS watch API on the
// provided xdsClient is invoked appropriately.
func (s) TestUpdateClientConnState(t *testing.T) {
xdsC := fakeclient.NewClient()
defer xdsC.Close()

tests := []struct {
name string
ccs balancer.ClientConnState
Expand All @@ -280,14 +281,14 @@ func (s) TestUpdateClientConnState(t *testing.T) {
},
{
name: "happy-good-case",
ccs: cdsCCS(clusterName),
ccs: cdsCCS(clusterName, xdsC),
wantCluster: clusterName,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
xdsC, cdsB, _, _, cancel := setup(t)
_, cdsB, _, _, cancel := setup(t)
defer func() {
cancel()
cdsB.Close()
Expand Down Expand Up @@ -324,7 +325,7 @@ func (s) TestUpdateClientConnStateWithSameState(t *testing.T) {
}()

// This is the same clientConn update sent in setupWithWatch().
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil {
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
}
// The above update should not result in a new watch being registered.
Expand Down Expand Up @@ -660,7 +661,7 @@ func (s) TestClose(t *testing.T) {

// Make sure that the UpdateClientConnState() method on the CDS balancer
// returns error.
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != errBalancerClosed {
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != errBalancerClosed {
t.Fatalf("UpdateClientConnState() after close returned %v, want %v", err, errBalancerClosed)
}

Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/cdsbalancer/cluster_handler.go
Expand Up @@ -40,7 +40,7 @@ type clusterHandler struct {
// CDS Balancer cares about is the most recent update.
updateChannel chan clusterHandlerUpdate

xdsClient xdsClient
xdsClient xdsclient.XDSClient
}

func (ch *clusterHandler) updateRootCluster(rootClusterName string) {
Expand Down Expand Up @@ -112,7 +112,7 @@ type clusterNode struct {

// CreateClusterNode creates a cluster node from a given clusterName. This will
// also start the watch for that cluster.
func createClusterNode(clusterName string, xdsClient xdsClient, topLevelHandler *clusterHandler) *clusterNode {
func createClusterNode(clusterName string, xdsClient xdsclient.XDSClient, topLevelHandler *clusterHandler) *clusterNode {
c := &clusterNode{
clusterHandler: topLevelHandler,
}
Expand Down

0 comments on commit b1418a6

Please sign in to comment.