From cd433d57183f3a3ff8b4c2b83a9bc46c327386e1 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 3 Jun 2021 11:43:17 -0700 Subject: [PATCH] [xds_client_in_attributes] cds --- .../balancer/cdsbalancer/cdsbalancer.go | 26 +------------------ .../cdsbalancer/cdsbalancer_security_test.go | 8 ++---- .../balancer/cdsbalancer/cdsbalancer_test.go | 23 ++++++++-------- .../balancer/cdsbalancer/cluster_handler.go | 4 +-- 4 files changed, 17 insertions(+), 44 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 1e6b7a2c08c2..7ee8ad8d2de9 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -36,7 +36,6 @@ import ( "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/edsbalancer" xdsclient "google.golang.org/grpc/xds/internal/client" - "google.golang.org/grpc/xds/internal/client/bootstrap" ) const ( @@ -59,7 +58,6 @@ var ( // not deal with subConns. return builder.Build(cc, opts), nil } - newXDSClient func() (xdsClientInterface, error) buildProvider = buildProviderFunc ) @@ -85,17 +83,6 @@ func (cdsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer. } b.logger = prefixLogger((b)) b.logger.Infof("Created") - - if newXDSClient != nil { - // For tests - client, err := newXDSClient() - if err != nil { - b.logger.Errorf("failed to create xds-client: %v", err) - return nil - } - b.xdsClient = client - } - var creds credentials.TransportCredentials switch { case opts.DialCreds != nil: @@ -138,14 +125,6 @@ func (cdsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, return &cfg, nil } -// xdsClientInterface contains methods from xdsClient.Client which are used by -// the cdsBalancer. This will be faked out in unittests. -type xdsClientInterface interface { - WatchCluster(string, func(xdsclient.ClusterUpdate, error)) func() - BootstrapConfig() *bootstrap.Config - Close() -} - // ccUpdate wraps a clientConn update received from gRPC (pushed from the // xdsResolver). A valid clusterName causes the cdsBalancer to register a CDS // watcher with the xdsClient, while a non-nil error causes it to cancel the @@ -185,7 +164,7 @@ type cdsBalancer struct { ccw *ccWrapper // ClientConn interface passed to child LB. bOpts balancer.BuildOptions // BuildOptions passed to child LB. updateCh *buffer.Unbounded // Channel for gRPC and xdsClient updates. - xdsClient xdsClientInterface // xDS client to watch Cluster resource. + xdsClient xdsclient.Interface // xDS client to watch Cluster resource. cancelWatch func() // Cluster watch cancel func. edsLB balancer.Balancer // EDS child policy. clusterToWatch string @@ -408,9 +387,6 @@ func (b *cdsBalancer) run() { b.edsLB.Close() b.edsLB = nil } - if newXDSClient != nil { - b.xdsClient.Close() - } if b.cachedRoot != nil { b.cachedRoot.Close() } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go index 52b1a05f1362..f737e9bfa734 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go @@ -133,11 +133,7 @@ func (p *fakeProvider) Close() { // xDSCredentials. func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) { t.Helper() - xdsC := fakeclient.NewClient() - oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil } - builder := balancer.Get(cdsName) if builder == nil { t.Fatalf("balancer.Get(%q) returned nil", cdsName) @@ -164,7 +160,7 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS } // Push a ClientConnState update to the CDS balancer with a cluster name. - if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil { + if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil { t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) } @@ -181,8 +177,8 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS } return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() { - newXDSClient = oldNewXDSClient newEDSBalancer = oldEDSBalancerBuilder + xdsC.Close() } } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index fba3ee1531e9..bc272a7e7e2e 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -174,7 +174,7 @@ func (tb *testEDSBalancer) waitForClose(ctx context.Context) error { // cdsCCS is a helper function to construct a good update passed from the // xdsResolver to the cdsBalancer. -func cdsCCS(cluster string) balancer.ClientConnState { +func cdsCCS(cluster string, xdsC xdsclient.Interface) balancer.ClientConnState { const cdsLBConfig = `{ "loadBalancingConfig":[ { @@ -186,9 +186,9 @@ func cdsCCS(cluster string) balancer.ClientConnState { }` jsonSC := fmt.Sprintf(cdsLBConfig, cluster) return balancer.ClientConnState{ - ResolverState: resolver.State{ + ResolverState: xdsclient.SetClient(resolver.State{ ServiceConfig: internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(jsonSC), - }, + }, xdsC), BalancerConfig: &lbConfig{ClusterName: clusterName}, } } @@ -214,8 +214,6 @@ func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *x t.Helper() xdsC := fakeclient.NewClient() - oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil } builder := balancer.Get(cdsName) if builder == nil { @@ -233,7 +231,7 @@ func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *x return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() { newEDSBalancer = oldEDSBalancerBuilder - newXDSClient = oldNewXDSClient + xdsC.Close() } } @@ -243,7 +241,7 @@ func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBal t.Helper() xdsC, cdsB, edsB, tcc, cancel := setup(t) - if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil { + if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil { t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) } @@ -263,6 +261,9 @@ func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBal // cdsBalancer with different inputs and verifies that the CDS watch API on the // provided xdsClient is invoked appropriately. func (s) TestUpdateClientConnState(t *testing.T) { + xdsC := fakeclient.NewClient() + defer xdsC.Close() + tests := []struct { name string ccs balancer.ClientConnState @@ -281,14 +282,14 @@ func (s) TestUpdateClientConnState(t *testing.T) { }, { name: "happy-good-case", - ccs: cdsCCS(clusterName), + ccs: cdsCCS(clusterName, xdsC), wantCluster: clusterName, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - xdsC, cdsB, _, _, cancel := setup(t) + _, cdsB, _, _, cancel := setup(t) defer func() { cancel() cdsB.Close() @@ -325,7 +326,7 @@ func (s) TestUpdateClientConnStateWithSameState(t *testing.T) { }() // This is the same clientConn update sent in setupWithWatch(). - if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != nil { + if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != nil { t.Fatalf("cdsBalancer.UpdateClientConnState failed with error: %v", err) } // The above update should not result in a new watch being registered. @@ -661,7 +662,7 @@ func (s) TestClose(t *testing.T) { // Make sure that the UpdateClientConnState() method on the CDS balancer // returns error. - if err := cdsB.UpdateClientConnState(cdsCCS(clusterName)); err != errBalancerClosed { + if err := cdsB.UpdateClientConnState(cdsCCS(clusterName, xdsC)); err != errBalancerClosed { t.Fatalf("UpdateClientConnState() after close returned %v, want %v", err, errBalancerClosed) } diff --git a/xds/internal/balancer/cdsbalancer/cluster_handler.go b/xds/internal/balancer/cdsbalancer/cluster_handler.go index 4c13b55594da..bdaf7ddaa958 100644 --- a/xds/internal/balancer/cdsbalancer/cluster_handler.go +++ b/xds/internal/balancer/cdsbalancer/cluster_handler.go @@ -40,7 +40,7 @@ type clusterHandler struct { // CDS Balancer cares about is the most recent update. updateChannel chan clusterHandlerUpdate - xdsClient xdsClientInterface + xdsClient xdsclient.Interface } func (ch *clusterHandler) updateRootCluster(rootClusterName string) { @@ -112,7 +112,7 @@ type clusterNode struct { // CreateClusterNode creates a cluster node from a given clusterName. This will // also start the watch for that cluster. -func createClusterNode(clusterName string, xdsClient xdsClientInterface, topLevelHandler *clusterHandler) *clusterNode { +func createClusterNode(clusterName string, xdsClient xdsclient.Interface, topLevelHandler *clusterHandler) *clusterNode { c := &clusterNode{ clusterHandler: topLevelHandler, }