Skip to content

Commit

Permalink
[xds_client_in_attributes] cluster impl
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Jun 3, 2021
1 parent 9b7e429 commit ea6986d
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 61 deletions.
48 changes: 12 additions & 36 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ func init() {
func TestDropByCategory(t *testing.T) {
defer client.ClearCounterForTesting(testClusterName)
xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
defer func() { newXDSClient = oldNewXDSClient }()
defer xdsC.Close()

builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
Expand All @@ -89,9 +87,7 @@ func TestDropByCategory(t *testing.T) {
dropDenominator = 2
)
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: testBackendAddrs,
},
ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
Expand Down Expand Up @@ -176,9 +172,7 @@ func TestDropByCategory(t *testing.T) {
dropDenominator2 = 4
)
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: testBackendAddrs,
},
ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
Expand Down Expand Up @@ -232,9 +226,7 @@ func TestDropByCategory(t *testing.T) {
func TestDropCircuitBreaking(t *testing.T) {
defer client.ClearCounterForTesting(testClusterName)
xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
defer func() { newXDSClient = oldNewXDSClient }()
defer xdsC.Close()

builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
Expand All @@ -243,9 +235,7 @@ func TestDropCircuitBreaking(t *testing.T) {

var maxRequest uint32 = 50
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: testBackendAddrs,
},
ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
Expand Down Expand Up @@ -344,19 +334,15 @@ func TestDropCircuitBreaking(t *testing.T) {
func TestPickerUpdateAfterClose(t *testing.T) {
defer client.ClearCounterForTesting(testClusterName)
xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
defer func() { newXDSClient = oldNewXDSClient }()
defer xdsC.Close()

builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})

var maxRequest uint32 = 50
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: testBackendAddrs,
},
ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
Expand Down Expand Up @@ -389,19 +375,15 @@ func TestPickerUpdateAfterClose(t *testing.T) {
func TestClusterNameInAddressAttributes(t *testing.T) {
defer client.ClearCounterForTesting(testClusterName)
xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
defer func() { newXDSClient = oldNewXDSClient }()
defer xdsC.Close()

builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()

if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: testBackendAddrs,
},
ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
Expand Down Expand Up @@ -450,9 +432,7 @@ func TestClusterNameInAddressAttributes(t *testing.T) {
const testClusterName2 = "test-cluster-2"
var addr2 = resolver.Address{Addr: "2.2.2.2"}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: []resolver.Address{addr2},
},
ResolverState: client.SetClient(resolver.State{Addresses: []resolver.Address{addr2}}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName2,
EDSServiceName: testServiceName,
Expand Down Expand Up @@ -480,19 +460,15 @@ func TestClusterNameInAddressAttributes(t *testing.T) {
func TestReResolution(t *testing.T) {
defer client.ClearCounterForTesting(testClusterName)
xdsC := fakeclient.NewClient()
oldNewXDSClient := newXDSClient
newXDSClient = func() (xdsClientInterface, error) { return xdsC, nil }
defer func() { newXDSClient = oldNewXDSClient }()
defer xdsC.Close()

builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()

if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: testBackendAddrs,
},
ResolverState: client.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
Expand Down
25 changes: 1 addition & 24 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ func init() {
balancer.Register(clusterImplBB{})
}

var newXDSClient func() (xdsClientInterface, error)

type clusterImplBB struct{}

func (clusterImplBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
Expand All @@ -67,18 +65,7 @@ func (clusterImplBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions)
requestCountMax: defaultRequestCountMax,
}
b.logger = prefixLogger(b)

if newXDSClient != nil {
// For tests
client, err := newXDSClient()
if err != nil {
b.logger.Errorf("failed to create xds-client: %v", err)
return nil
}
b.xdsC = client
}
go b.run()

b.logger.Infof("Created")
return b
}
Expand All @@ -91,13 +78,6 @@ func (clusterImplBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancing
return parseConfig(c)
}

// xdsClientInterface contains only the xds_client methods needed by LRS
// balancer. It's defined so we can override xdsclient in tests.
type xdsClientInterface interface {
ReportLoad(server string) (*load.Store, func())
Close()
}

type clusterImplBalancer struct {
balancer.ClientConn

Expand All @@ -115,7 +95,7 @@ type clusterImplBalancer struct {

bOpts balancer.BuildOptions
logger *grpclog.PrefixLogger
xdsC xdsClientInterface
xdsC xdsclient.Interface

config *LBConfig
childLB balancer.Balancer
Expand Down Expand Up @@ -328,9 +308,6 @@ func (cib *clusterImplBalancer) Close() {
cib.childLB.Close()
cib.childLB = nil
}
if newXDSClient != nil {
cib.xdsC.Close()
}
<-cib.done.Done()
cib.logger.Infof("Shutdown")
}
Expand Down
1 change: 0 additions & 1 deletion xds/internal/balancer/lrs/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balanc
}
b.logger = prefixLogger(b)
b.logger.Infof("Created")

return b
}

Expand Down

0 comments on commit ea6986d

Please sign in to comment.