diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index 21e3f43be74e..8b1cea7bbdcd 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -41,19 +41,10 @@ import ( const edsName = "eds_experimental" -// xdsClientInterface contains only the xds_client methods needed by EDS -// balancer. It's defined so we can override xdsclient.New function in tests. -type xdsClientInterface interface { - WatchEndpoints(clusterName string, edsCb func(xdsclient.EndpointsUpdate, error)) (cancel func()) - ReportLoad(server string) (loadStore *load.Store, cancel func()) - Close() -} - var ( newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface { return newEDSBalancerImpl(cc, opts, enqueueState, lw, logger) } - newXDSClient func() (xdsClientInterface, error) ) func init() { @@ -75,17 +66,6 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp config: &EDSConfig{}, } x.logger = prefixLogger(x) - - if newXDSClient != nil { - // For tests - client, err := newXDSClient() - if err != nil { - x.logger.Errorf("xds: failed to create xds-client: %v", err) - return nil - } - x.xdsClient = client - } - x.edsImpl = newEDSBalancer(x.cc, opts, x.enqueueChildBalancerState, x.loadWrapper, x.logger) x.logger.Infof("Created") go x.run() @@ -145,7 +125,7 @@ type edsBalancer struct { xdsClientUpdate chan *edsUpdate childPolicyUpdate *buffer.Unbounded - xdsClient xdsClientInterface + xdsClient xdsclient.Interface loadWrapper *loadstore.Wrapper config *EDSConfig // may change when passed a different service config edsImpl edsBalancerImplInterface @@ -175,9 +155,6 @@ func (x *edsBalancer) run() { x.edsImpl.updateState(u.priority, u.s) case <-x.closed.Done(): x.cancelWatch() - if newXDSClient != nil { - x.xdsClient.Close() - } x.edsImpl.close() x.logger.Infof("Shutdown") x.done.Fire() diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index 3fe66098973c..d6713f9f048a 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -259,8 +259,6 @@ func waitForNewEDSLB(ctx context.Context, ch *testutils.Channel) (*fakeEDSBalanc // cleanup. func setup(edsLBCh *testutils.Channel) (*fakeclient.Client, func()) { xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar) - oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil } origNewEDSBalancer := newEDSBalancer newEDSBalancer = func(cc balancer.ClientConn, _ balancer.BuildOptions, _ func(priorityType, balancer.State), _ load.PerClusterReporter, _ *grpclog.PrefixLogger) edsBalancerImplInterface { @@ -270,7 +268,7 @@ func setup(edsLBCh *testutils.Channel) (*fakeclient.Client, func()) { } return xdsC, func() { newEDSBalancer = origNewEDSBalancer - newXDSClient = oldNewXDSClient + xdsC.Close() } } @@ -352,6 +350,7 @@ func (s) TestConfigChildPolicyUpdate(t *testing.T) { Config: json.RawMessage("{}"), } if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{ ChildPolicy: lbCfgA, ClusterName: testEDSClusterName, @@ -381,6 +380,7 @@ func (s) TestConfigChildPolicyUpdate(t *testing.T) { Config: json.RawMessage("{}"), } if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{ ChildPolicy: lbCfgB, ClusterName: testEDSClusterName, @@ -425,6 +425,7 @@ func (s) TestSubConnStateChange(t *testing.T) { } if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, }); err != nil { t.Fatalf("edsB.UpdateClientConnState() failed: %v", err) @@ -471,6 +472,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) { } if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, }); err != nil { t.Fatal(err) @@ -515,6 +517,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) { // An update with the same service name should not trigger a new watch. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, }); err != nil { t.Fatal(err) @@ -553,6 +556,7 @@ func (s) TestErrorFromResolver(t *testing.T) { } if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, }); err != nil { t.Fatal(err) @@ -593,6 +597,7 @@ func (s) TestErrorFromResolver(t *testing.T) { // An update with the same service name should trigger a new watch, because // the previous watch was canceled. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{EDSServiceName: testServiceName}, }); err != nil { t.Fatal(err) @@ -644,6 +649,7 @@ func (s) TestClientWatchEDS(t *testing.T) { defer cancel() // If eds service name is not set, should watch for cluster name. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{ClusterName: "cluster-1"}, }); err != nil { t.Fatal(err) @@ -655,6 +661,7 @@ func (s) TestClientWatchEDS(t *testing.T) { // Update with an non-empty edsServiceName should trigger an EDS watch for // the same. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{EDSServiceName: "foobar-1"}, }); err != nil { t.Fatal(err) @@ -668,6 +675,7 @@ func (s) TestClientWatchEDS(t *testing.T) { // registered watch will be cancelled, which will result in an EDS request // with no resource names being sent to the server. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{EDSServiceName: "foobar-2"}, }); err != nil { t.Fatal(err) @@ -681,7 +689,7 @@ func (s) TestClientWatchEDS(t *testing.T) { // service name from an update's config. func (s) TestCounterUpdate(t *testing.T) { edsLBCh := testutils.NewChannel() - _, cleanup := setup(edsLBCh) + xdsC, cleanup := setup(edsLBCh) defer cleanup() builder := balancer.Get(edsName) @@ -694,6 +702,7 @@ func (s) TestCounterUpdate(t *testing.T) { var testCountMax uint32 = 100 // Update should trigger counter update with provided service name. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{ ClusterName: "foobar-1", MaxConcurrentRequests: &testCountMax, @@ -728,6 +737,7 @@ func (s) TestClusterNameUpdateInAddressAttributes(t *testing.T) { // Update should trigger counter update with provided service name. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{ ClusterName: "foobar-1", }, @@ -747,6 +757,7 @@ func (s) TestClusterNameUpdateInAddressAttributes(t *testing.T) { // Update should trigger counter update with provided service name. if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{ ClusterName: "foobar-2", }, diff --git a/xds/internal/balancer/edsbalancer/xds_lrs_test.go b/xds/internal/balancer/edsbalancer/xds_lrs_test.go index 8b7aab657667..86e36fceaedf 100644 --- a/xds/internal/balancer/edsbalancer/xds_lrs_test.go +++ b/xds/internal/balancer/edsbalancer/xds_lrs_test.go @@ -25,6 +25,8 @@ import ( "testing" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/resolver" + xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/testutils/fakeclient" ) @@ -33,9 +35,7 @@ import ( // server (empty string). func (s) TestXDSLoadReporting(t *testing.T) { xdsC := fakeclient.NewClient() - oldNewXDSClient := newXDSClient - newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil } - defer func() { newXDSClient = oldNewXDSClient }() + defer xdsC.Close() builder := balancer.Get(edsName) edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{}) @@ -45,6 +45,7 @@ func (s) TestXDSLoadReporting(t *testing.T) { defer edsB.Close() if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &EDSConfig{ EDSServiceName: testEDSClusterName, LrsLoadReportingServerName: new(string),