Skip to content

Commit

Permalink
[xds_client_in_attributes] eds
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Jun 3, 2021
1 parent ea6986d commit e8143a9
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 31 deletions.
25 changes: 1 addition & 24 deletions xds/internal/balancer/edsbalancer/eds.go
Expand Up @@ -41,19 +41,10 @@ import (

const edsName = "eds_experimental"

// xdsClientInterface contains only the xds_client methods needed by EDS
// balancer. It's defined so we can override xdsclient.New function in tests.
type xdsClientInterface interface {
WatchEndpoints(clusterName string, edsCb func(xdsclient.EndpointsUpdate, error)) (cancel func())
ReportLoad(server string) (loadStore *load.Store, cancel func())
Close()
}

var (
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
return newEDSBalancerImpl(cc, opts, enqueueState, lw, logger)
}
newXDSClient func() (xdsClientInterface, error)
)

func init() {
Expand All @@ -75,17 +66,6 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp
config: &EDSConfig{},
}
x.logger = prefixLogger(x)

if newXDSClient != nil {
// For tests
client, err := newXDSClient()
if err != nil {
x.logger.Errorf("xds: failed to create xds-client: %v", err)
return nil
}
x.xdsClient = client
}

x.edsImpl = newEDSBalancer(x.cc, opts, x.enqueueChildBalancerState, x.loadWrapper, x.logger)
x.logger.Infof("Created")
go x.run()
Expand Down Expand Up @@ -145,7 +125,7 @@ type edsBalancer struct {
xdsClientUpdate chan *edsUpdate
childPolicyUpdate *buffer.Unbounded

xdsClient xdsClientInterface
xdsClient xdsclient.Interface
loadWrapper *loadstore.Wrapper
config *EDSConfig // may change when passed a different service config
edsImpl edsBalancerImplInterface
Expand Down Expand Up @@ -175,9 +155,6 @@ func (x *edsBalancer) run() {
x.edsImpl.updateState(u.priority, u.s)
case <-x.closed.Done():
x.cancelWatch()
if newXDSClient != nil {
x.xdsClient.Close()
}
x.edsImpl.close()
x.logger.Infof("Shutdown")
x.done.Fire()
Expand Down
19 changes: 15 additions & 4 deletions xds/internal/balancer/edsbalancer/eds_test.go
Expand Up @@ -259,8 +259,6 @@ func waitForNewEDSLB(ctx context.Context, ch *testutils.Channel) (*fakeEDSBalanc
// cleanup.
func setup(edsLBCh *testutils.Channel) (*fakeclient.Client, func()) {
xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }

origNewEDSBalancer := newEDSBalancer
newEDSBalancer = func(cc balancer.ClientConn, _ balancer.BuildOptions, _ func(priorityType, balancer.State), _ load.PerClusterReporter, _ *grpclog.PrefixLogger) edsBalancerImplInterface {
Expand All @@ -270,7 +268,7 @@ func setup(edsLBCh *testutils.Channel) (*fakeclient.Client, func()) {
}
return xdsC, func() {
newEDSBalancer = origNewEDSBalancer
newXDSClient = oldNewXDSClient
xdsC.Close()
}
}

Expand Down Expand Up @@ -352,6 +350,7 @@ func (s) TestConfigChildPolicyUpdate(t *testing.T) {
Config: json.RawMessage("{}"),
}
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{
ChildPolicy: lbCfgA,
ClusterName: testEDSClusterName,
Expand Down Expand Up @@ -381,6 +380,7 @@ func (s) TestConfigChildPolicyUpdate(t *testing.T) {
Config: json.RawMessage("{}"),
}
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{
ChildPolicy: lbCfgB,
ClusterName: testEDSClusterName,
Expand Down Expand Up @@ -425,6 +425,7 @@ func (s) TestSubConnStateChange(t *testing.T) {
}

if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
}); err != nil {
t.Fatalf("edsB.UpdateClientConnState() failed: %v", err)
Expand Down Expand Up @@ -471,6 +472,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
}

if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -515,6 +517,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {

// An update with the same service name should not trigger a new watch.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -553,6 +556,7 @@ func (s) TestErrorFromResolver(t *testing.T) {
}

if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -593,6 +597,7 @@ func (s) TestErrorFromResolver(t *testing.T) {
// An update with the same service name should trigger a new watch, because
// the previous watch was canceled.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -644,6 +649,7 @@ func (s) TestClientWatchEDS(t *testing.T) {
defer cancel()
// If eds service name is not set, should watch for cluster name.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{ClusterName: "cluster-1"},
}); err != nil {
t.Fatal(err)
Expand All @@ -655,6 +661,7 @@ func (s) TestClientWatchEDS(t *testing.T) {
// Update with an non-empty edsServiceName should trigger an EDS watch for
// the same.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: "foobar-1"},
}); err != nil {
t.Fatal(err)
Expand All @@ -668,6 +675,7 @@ func (s) TestClientWatchEDS(t *testing.T) {
// registered watch will be cancelled, which will result in an EDS request
// with no resource names being sent to the server.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: "foobar-2"},
}); err != nil {
t.Fatal(err)
Expand All @@ -681,7 +689,7 @@ func (s) TestClientWatchEDS(t *testing.T) {
// service name from an update's config.
func (s) TestCounterUpdate(t *testing.T) {
edsLBCh := testutils.NewChannel()
_, cleanup := setup(edsLBCh)
xdsC, cleanup := setup(edsLBCh)
defer cleanup()

builder := balancer.Get(edsName)
Expand All @@ -694,6 +702,7 @@ func (s) TestCounterUpdate(t *testing.T) {
var testCountMax uint32 = 100
// Update should trigger counter update with provided service name.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{
ClusterName: "foobar-1",
MaxConcurrentRequests: &testCountMax,
Expand Down Expand Up @@ -728,6 +737,7 @@ func (s) TestClusterNameUpdateInAddressAttributes(t *testing.T) {

// Update should trigger counter update with provided service name.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{
ClusterName: "foobar-1",
},
Expand All @@ -747,6 +757,7 @@ func (s) TestClusterNameUpdateInAddressAttributes(t *testing.T) {

// Update should trigger counter update with provided service name.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{
ClusterName: "foobar-2",
},
Expand Down
7 changes: 4 additions & 3 deletions xds/internal/balancer/edsbalancer/xds_lrs_test.go
Expand Up @@ -25,6 +25,8 @@ import (
"testing"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/resolver"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
)

Expand All @@ -33,9 +35,7 @@ import (
// server (empty string).
func (s) TestXDSLoadReporting(t *testing.T) {
xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
defer func() { newXDSClient = oldNewXDSClient }()
defer xdsC.Close()

builder := balancer.Get(edsName)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{})
Expand All @@ -45,6 +45,7 @@ func (s) TestXDSLoadReporting(t *testing.T) {
defer edsB.Close()

if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{
EDSServiceName: testEDSClusterName,
LrsLoadReportingServerName: new(string),
Expand Down

0 comments on commit e8143a9

Please sign in to comment.