Skip to content

Commit

Permalink
[xds_client_in_attributes] cds
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Jun 3, 2021
1 parent e8143a9 commit cd433d5
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 44 deletions.
26 changes: 1 addition & 25 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Expand Up @@ -36,7 +36,6 @@ import (
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
)

const (
Expand All @@ -59,7 +58,6 @@ var (
// not deal with subConns.
return builder.Build(cc, opts), nil
}
newXDSClient func() (xdsClientInterface, error)
buildProvider = buildProviderFunc
)

Expand All @@ -85,17 +83,6 @@ func (cdsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
}
b.logger = prefixLogger((b))
b.logger.Infof("Created")

if newXDSClient != nil {
// For tests
client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
}
b.xdsClient = client
}

var creds credentials.TransportCredentials
switch {
case opts.DialCreds != nil:
Expand Down Expand Up @@ -138,14 +125,6 @@ func (cdsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig,
return &cfg, nil
}

// xdsClientInterface contains methods from xdsClient.Client which are used by
// the cdsBalancer. This will be faked out in unittests.
type xdsClientInterface interface {
WatchCluster(string, func(xdsclient.ClusterUpdate, error)) func()
BootstrapConfig() *bootstrap.Config
Close()
}

// ccUpdate wraps a clientConn update received from gRPC (pushed from the
// xdsResolver). A valid clusterName causes the cdsBalancer to register a CDS
// watcher with the xdsClient, while a non-nil error causes it to cancel the
Expand Down Expand Up @@ -185,7 +164,7 @@ type cdsBalancer struct {
ccw *ccWrapper // ClientConn interface passed to child LB.
bOpts balancer.BuildOptions // BuildOptions passed to child LB.
updateCh *buffer.Unbounded // Channel for gRPC and xdsClient updates.
xdsClient xdsClientInterface // xDS client to watch Cluster resource.
xdsClient xdsclient.Interface // xDS client to watch Cluster resource.
cancelWatch func() // Cluster watch cancel func.
edsLB balancer.Balancer // EDS child policy.
clusterToWatch string
Expand Down Expand Up @@ -408,9 +387,6 @@ func (b *cdsBalancer) run() {
b.edsLB.Close()
b.edsLB = nil
}
if newXDSClient != nil {
b.xdsClient.Close()
}
if b.cachedRoot != nil {
b.cachedRoot.Close()
}
Expand Down
Expand Up @@ -133,11 +133,7 @@ func (p *fakeProvider) Close() {
// xDSCredentials.
func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) {
t.Helper()

xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }

builder := balancer.Get(cdsName)
if builder == nil {
t.Fatalf("balancer.Get(%q) returned nil", cdsName)
Expand All @@ -164,7 +160,7 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
}

// Push a ClientConnState update to the CDS balancer with a cluster name.
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil {
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
}

Expand All @@ -181,8 +177,8 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
}

return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() {
newXDSClient = oldNewXDSClient
newEDSBalancer = oldEDSBalancerBuilder
xdsC.Close()
}
}

Expand Down
23 changes: 12 additions & 11 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Expand Up @@ -174,7 +174,7 @@ func (tb *testEDSBalancer) waitForClose(ctx context.Context) error {

// cdsCCS is a helper function to construct a good update passed from the
// xdsResolver to the cdsBalancer.
func cdsCCS(cluster string) balancer.ClientConnState {
func cdsCCS(cluster string, xdsC xdsclient.Interface) balancer.ClientConnState {
const cdsLBConfig = `{
"loadBalancingConfig":[
{
Expand All @@ -186,9 +186,9 @@ func cdsCCS(cluster string) balancer.ClientConnState {
}`
jsonSC := fmt.Sprintf(cdsLBConfig, cluster)
return balancer.ClientConnState{
ResolverState: resolver.State{
ResolverState: xdsclient.SetClient(resolver.State{
ServiceConfig: internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(jsonSC),
},
}, xdsC),
BalancerConfig: &lbConfig{ClusterName: clusterName},
}
}
Expand All @@ -214,8 +214,6 @@ func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *x
t.Helper()

xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }

builder := balancer.Get(cdsName)
if builder == nil {
Expand All @@ -233,7 +231,7 @@ func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *x

return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() {
newEDSBalancer = oldEDSBalancerBuilder
newXDSClient = oldNewXDSClient
xdsC.Close()
}
}

Expand All @@ -243,7 +241,7 @@ func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBal
t.Helper()

xdsC, cdsB, edsB, tcc, cancel := setup(t)
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil {
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
}

Expand All @@ -263,6 +261,9 @@ func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBal
// cdsBalancer with different inputs and verifies that the CDS watch API on the
// provided xdsClient is invoked appropriately.
func (s) TestUpdateClientConnState(t *testing.T) {
xdsC := fakeclient.NewClient()
defer xdsC.Close()

tests := []struct {
name string
ccs balancer.ClientConnState
Expand All @@ -281,14 +282,14 @@ func (s) TestUpdateClientConnState(t *testing.T) {
},
{
name: "happy-good-case",
ccs: cdsCCS(clusterName),
ccs: cdsCCS(clusterName, xdsC),
wantCluster: clusterName,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
xdsC, cdsB, _, _, cancel := setup(t)
_, cdsB, _, _, cancel := setup(t)
defer func() {
cancel()
cdsB.Close()
Expand Down Expand Up @@ -325,7 +326,7 @@ func (s) TestUpdateClientConnStateWithSameState(t *testing.T) {
}()

// This is the same clientConn update sent in setupWithWatch().
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil {
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil {
t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err)
}
// The above update should not result in a new watch being registered.
Expand Down Expand Up @@ -661,7 +662,7 @@ func (s) TestClose(t *testing.T) {

// Make sure that the UpdateClientConnState() method on the CDS balancer
// returns error.
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != errBalancerClosed {
if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != errBalancerClosed {
t.Fatalf("UpdateClientConnState() after close returned %v, want %v", err, errBalancerClosed)
}

Expand Down
4 changes: 2 additions & 2 deletions xds/internal/balancer/cdsbalancer/cluster_handler.go
Expand Up @@ -40,7 +40,7 @@ type clusterHandler struct {
// CDS Balancer cares about is the most recent update.
updateChannel chan clusterHandlerUpdate

xdsClient xdsClientInterface
xdsClient xdsclient.Interface
}

func (ch *clusterHandler) updateRootCluster(rootClusterName string) {
Expand Down Expand Up @@ -112,7 +112,7 @@ type clusterNode struct {

// CreateClusterNode creates a cluster node from a given clusterName. This will
// also start the watch for that cluster.
func createClusterNode(clusterName string, xdsClient xdsClientInterface, topLevelHandler *clusterHandler) *clusterNode {
func createClusterNode(clusterName string, xdsClient xdsclient.Interface, topLevelHandler *clusterHandler) *clusterNode {
c := &clusterNode{
clusterHandler: topLevelHandler,
}
Expand Down

0 comments on commit cd433d5

Please sign in to comment.