Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: export XDSClient interface and use it in balancer tests #4510

Merged
merged 16 commits into from Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 2 additions & 18 deletions xds/csds/csds.go
Expand Up @@ -38,33 +38,17 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/types/known/timestamppb"

_ "google.golang.org/grpc/xds/internal/xdsclient/v2" // Register v2 xds_client.
_ "google.golang.org/grpc/xds/internal/xdsclient/v3" // Register v3 xds_client.
)

// xdsClient contains methods from xdsClient.Client which are used by
// the server. This is useful for overriding in unit tests.
type xdsClient interface {
DumpLDS() (string, map[string]xdsclient.UpdateWithMD)
DumpRDS() (string, map[string]xdsclient.UpdateWithMD)
DumpCDS() (string, map[string]xdsclient.UpdateWithMD)
DumpEDS() (string, map[string]xdsclient.UpdateWithMD)
BootstrapConfig() *bootstrap.Config
Close()
}

var (
logger = grpclog.Component("xds")
newXDSClient = func() xdsClient {
newXDSClient = func() xdsclient.XDSClient {
c, err := xdsclient.New()
if err != nil {
// If err is not nil, c is a typed nil (of type *xdsclient.Client).
// If c is returned and assigned to the xdsClient field in the CSDS
// server, the nil checks in the handlers will not handle it
// properly.
logger.Warningf("failed to create xds client: %v", err)
return nil
}
Expand All @@ -76,7 +60,7 @@ var (
type ClientStatusDiscoveryServer struct {
// xdsClient will always be the same in practice. But we keep a copy in each
// server instance for testing.
xdsClient xdsClient
xdsClient xdsclient.XDSClient
}

// NewClientStatusDiscoveryServer returns an implementation of the CSDS server that can be
Expand Down
13 changes: 3 additions & 10 deletions xds/csds/csds_test.go
Expand Up @@ -59,13 +59,6 @@ const (
defaultTestTimeout = 10 * time.Second
)

type xdsClientWithWatch interface {
WatchListener(string, func(xdsclient.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsclient.RouteConfigUpdate, error)) func()
WatchCluster(string, func(xdsclient.ClusterUpdate, error)) func()
WatchEndpoints(string, func(xdsclient.EndpointsUpdate, error)) func()
}

var cmpOpts = cmp.Options{
cmpopts.EquateEmpty(),
cmp.Comparer(func(a, b *timestamppb.Timestamp) bool { return true }),
Expand Down Expand Up @@ -250,7 +243,7 @@ func TestCSDS(t *testing.T) {
}
}

func commonSetup(t *testing.T) (xdsClientWithWatch, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) {
func commonSetup(t *testing.T) (xdsclient.XDSClient, *e2e.ManagementServer, string, v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, func()) {
t.Helper()

// Spin up a xDS management server on a local port.
Expand All @@ -275,7 +268,7 @@ func commonSetup(t *testing.T) (xdsClientWithWatch, *e2e.ManagementServer, strin
t.Fatalf("failed to create xds client: %v", err)
}
oldNewXDSClient := newXDSClient
newXDSClient = func() xdsClient { return xdsC }
newXDSClient = func() xdsclient.XDSClient { return xdsC }

// Initialize an gRPC server and register CSDS on it.
server := grpc.NewServer()
Expand Down Expand Up @@ -635,7 +628,7 @@ func protoToJSON(p proto.Message) string {

func TestCSDSNoXDSClient(t *testing.T) {
oldNewXDSClient := newXDSClient
newXDSClient = func() xdsClient { return nil }
newXDSClient = func() xdsclient.XDSClient { return nil }
defer func() { newXDSClient = oldNewXDSClient }()

// Initialize an gRPC server and register CSDS on it.
Expand Down
8 changes: 2 additions & 6 deletions xds/googledirectpath/googlec2p.go
Expand Up @@ -62,15 +62,11 @@ const (
dnsName, xdsName = "dns", "xds"
)

type xdsClient interface {
Close()
}

// For overriding in unittests.
var (
onGCE = googlecloud.OnGCE

newClientWithConfig = func(config *bootstrap.Config) (xdsClient, error) {
newClientWithConfig = func(config *bootstrap.Config) (xdsclient.XDSClient, error) {
return xdsclient.NewWithConfig(config)
}

Expand Down Expand Up @@ -139,7 +135,7 @@ func (c2pResolverBuilder) Scheme() string {

type c2pResolver struct {
resolver.Resolver
client xdsClient
client xdsclient.XDSClient
}

func (r *c2pResolver) Close() {
Expand Down
4 changes: 3 additions & 1 deletion xds/googledirectpath/googlec2p_test.go
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -130,6 +131,7 @@ func TestBuildNotOnGCE(t *testing.T) {
}

type testXDSClient struct {
xdsclient.XDSClient
closed chan struct{}
}

Expand Down Expand Up @@ -177,7 +179,7 @@ func TestBuildXDS(t *testing.T) {

configCh := make(chan *bootstrap.Config, 1)
oldNewClient := newClientWithConfig
newClientWithConfig = func(config *bootstrap.Config) (xdsClient, error) {
newClientWithConfig = func(config *bootstrap.Config) (xdsclient.XDSClient, error) {
configCh <- config
return tXDSClient, nil
}
Expand Down
35 changes: 2 additions & 33 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Expand Up @@ -36,7 +36,6 @@ import (
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
)

const (
Expand All @@ -59,7 +58,6 @@ var (
// not deal with subConns.
return builder.Build(cc, opts), nil
}
newXDSClient func() (xdsClient, error)
buildProvider = buildProviderFunc
)

Expand All @@ -84,17 +82,6 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal
}
b.logger = prefixLogger((b))
b.logger.Infof("Created")

if newXDSClient != nil {
// For tests
client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
}
b.xdsClient = client
}

var creds credentials.TransportCredentials
switch {
case opts.DialCreds != nil:
Expand Down Expand Up @@ -137,14 +124,6 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
return &cfg, nil
}

// xdsClient contains methods from xdsClient.Client which are used by
// the cdsBalancer. This will be faked out in unittests.
type xdsClient interface {
WatchCluster(string, func(xdsclient.ClusterUpdate, error)) func()
BootstrapConfig() *bootstrap.Config
Close()
}

// ccUpdate wraps a clientConn update received from gRPC (pushed from the
// xdsResolver). A valid clusterName causes the cdsBalancer to register a CDS
// watcher with the xdsClient, while a non-nil error causes it to cancel the
Expand Down Expand Up @@ -184,7 +163,7 @@ type cdsBalancer struct {
ccw *ccWrapper // ClientConn interface passed to child LB.
bOpts balancer.BuildOptions // BuildOptions passed to child LB.
updateCh *buffer.Unbounded // Channel for gRPC and xdsClient updates.
xdsClient xdsClient // xDS client to watch Cluster resource.
xdsClient xdsclient.XDSClient // xDS client to watch Cluster resource.
cancelWatch func() // Cluster watch cancel func.
edsLB balancer.Balancer // EDS child policy.
clusterToWatch string
Expand Down Expand Up @@ -361,15 +340,8 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) {
lbCfg.LrsLoadReportingServerName = new(string)

}
resolverState := resolver.State{}
// Include the xds client for the child LB policies to use. For unit
// tests, b.xdsClient may not be a full *xdsclient.Client, but it will
// always be in production.
if c, ok := b.xdsClient.(*xdsclient.Client); ok {
resolverState = xdsclient.SetClient(resolverState, c)
}
ccState := balancer.ClientConnState{
ResolverState: resolverState,
ResolverState: xdsclient.SetClient(resolver.State{}, b.xdsClient),
BalancerConfig: lbCfg,
}
if err := b.edsLB.UpdateClientConnState(ccState); err != nil {
Expand Down Expand Up @@ -407,9 +379,6 @@ func (b *cdsBalancer) run() {
b.edsLB.Close()
b.edsLB = nil
}
if newXDSClient != nil {
b.xdsClient.Close()
}
if b.cachedRoot != nil {
b.cachedRoot.Close()
}
Expand Down
Expand Up @@ -133,11 +133,7 @@ func (p *fakeProvider) Close() {
// xDSCredentials.
func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) {
t.Helper()

xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClient, error) { return xdsC, nil }

builder := balancer.Get(cdsName)
if builder == nil {
t.Fatalf("balancer.Get(%q) returned nil", cdsName)
Expand All @@ -164,7 +160,7 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
}

// Push a ClientConnState update to the CDS balancer with a cluster name.
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil {
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
}

Expand All @@ -181,8 +177,8 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
}

return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() {
newXDSClient = oldNewXDSClient
newEDSBalancer = oldEDSBalancerBuilder
xdsC.Close()
}
}

Expand Down
31 changes: 16 additions & 15 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
Expand Down Expand Up @@ -129,7 +128,10 @@ func (tb *testEDSBalancer) waitForClientConnUpdate(ctx context.Context, wantCCS
return err
}
gotCCS := ccs.(balancer.ClientConnState)
if !cmp.Equal(gotCCS, wantCCS, cmpopts.IgnoreUnexported(attributes.Attributes{})) {
if xdsclient.FromResolverState(gotCCS.ResolverState) == nil {
return fmt.Errorf("want resolver state with XDSClient attached, got one without")
}
if !cmp.Equal(gotCCS, wantCCS, cmpopts.IgnoreFields(resolver.State{}, "Attributes")) {
return fmt.Errorf("received ClientConnState: %+v, want %+v", gotCCS, wantCCS)
}
return nil
Expand Down Expand Up @@ -173,7 +175,7 @@ func (tb *testEDSBalancer) waitForClose(ctx context.Context) error {

// cdsCCS is a helper function to construct a good update passed from the
// xdsResolver to the cdsBalancer.
func cdsCCS(cluster string) balancer.ClientConnState {
func cdsCCS(cluster string, xdsC xdsclient.XDSClient) balancer.ClientConnState {
const cdsLBConfig = `{
"loadBalancingConfig":[
{
Expand All @@ -185,9 +187,9 @@ func cdsCCS(cluster string) balancer.ClientConnState {
}`
jsonSC := fmt.Sprintf(cdsLBConfig, cluster)
return balancer.ClientConnState{
ResolverState: resolver.State{
ResolverState: xdsclient.SetClient(resolver.State{
ServiceConfig: internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(jsonSC),
},
}, xdsC),
BalancerConfig: &lbConfig{ClusterName: clusterName},
}
}
Expand All @@ -211,11 +213,7 @@ func edsCCS(service string, countMax *uint32, enableLRS bool) balancer.ClientCon
// newEDSBalancer function to return it), and also returns a cleanup function.
func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) {
t.Helper()

xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClient, error) { return xdsC, nil }

builder := balancer.Get(cdsName)
if builder == nil {
t.Fatalf("balancer.Get(%q) returned nil", cdsName)
Expand All @@ -232,7 +230,7 @@ func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *x

return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() {
newEDSBalancer = oldEDSBalancerBuilder
newXDSClient = oldNewXDSClient
xdsC.Close()
}
}

Expand All @@ -242,7 +240,7 @@ func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBal
t.Helper()

xdsC, cdsB, edsB, tcc, cancel := setup(t)
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil {
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
}

Expand All @@ -262,6 +260,9 @@ func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBal
// cdsBalancer with different inputs and verifies that the CDS watch API on the
// provided xdsClient is invoked appropriately.
func (s) TestUpdateClientConnState(t *testing.T) {
xdsC := fakeclient.NewClient()
defer xdsC.Close()

tests := []struct {
name string
ccs balancer.ClientConnState
Expand All @@ -280,14 +281,14 @@ func (s) TestUpdateClientConnState(t *testing.T) {
},
{
name: "happy-good-case",
ccs: cdsCCS(clusterName),
ccs: cdsCCS(clusterName, xdsC),
wantCluster: clusterName,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
xdsC, cdsB, _, _, cancel := setup(t)
_, cdsB, _, _, cancel := setup(t)
defer func() {
cancel()
cdsB.Close()
Expand Down Expand Up @@ -324,7 +325,7 @@ func (s) TestUpdateClientConnStateWithSameState(t *testing.T) {
}()

// This is the same clientConn update sent in setupWithWatch().
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil {
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
}
// The above update should not result in a new watch being registered.
Expand Down Expand Up @@ -660,7 +661,7 @@ func (s) TestClose(t *testing.T) {

// Make sure that the UpdateClientConnState() method on the CDS balancer
// returns error.
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != errBalancerClosed {
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != errBalancerClosed {
t.Fatalf("UpdateClientConnState() after close returned %v, want %v", err, errBalancerClosed)
}

Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/cdsbalancer/cluster_handler.go
Expand Up @@ -40,7 +40,7 @@ type clusterHandler struct {
// CDS Balancer cares about is the most recent update.
updateChannel chan clusterHandlerUpdate

xdsClient xdsClient
xdsClient xdsclient.XDSClient
}

func (ch *clusterHandler) updateRootCluster(rootClusterName string) {
Expand Down Expand Up @@ -112,7 +112,7 @@ type clusterNode struct {

// CreateClusterNode creates a cluster node from a given clusterName. This will
// also start the watch for that cluster.
func createClusterNode(clusterName string, xdsClient xdsClient, topLevelHandler *clusterHandler) *clusterNode {
func createClusterNode(clusterName string, xdsClient xdsclient.XDSClient, topLevelHandler *clusterHandler) *clusterNode {
c := &clusterNode{
clusterHandler: topLevelHandler,
}
Expand Down