Skip to content

Commit

Permalink
xds/cluster_impl: fix cluster_impl not correctly starting LoadReport …
Browse files Browse the repository at this point in the history
…stream (#4566)
  • Loading branch information
menghanl committed Jun 29, 2021
1 parent 83f9def commit b3f274c
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 16 deletions.
113 changes: 109 additions & 4 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ import (
)

const (
defaultTestTimeout = 1 * time.Second
testClusterName = "test-cluster"
testServiceName = "test-eds-service"
testLRSServerName = "test-lrs-name"
defaultTestTimeout = 1 * time.Second
defaultShortTestTimeout = 100 * time.Microsecond

testClusterName = "test-cluster"
testServiceName = "test-eds-service"
testLRSServerName = "test-lrs-name"
)

var (
Expand Down Expand Up @@ -649,6 +651,109 @@ func TestLoadReporting(t *testing.T) {
if reqStats.InProgress != 0 {
t.Errorf("got inProgress %v, want %v", reqStats.InProgress, 0)
}

b.Close()
if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
}
}

// TestUpdateLRSServer covers the cases
// - the init config specifies "" as the LRS server
// - config modifies LRS server to a different string
// - config sets LRS server to nil to stop load reporting
func TestUpdateLRSServer(t *testing.T) {
var testLocality = xdsinternal.LocalityID{
Region: "test-region",
Zone: "test-zone",
SubZone: "test-sub-zone",
}

xdsC := fakeclient.NewClient()
defer xdsC.Close()

builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()

addrs := make([]resolver.Address, len(testBackendAddrs))
for i, a := range testBackendAddrs {
addrs[i] = xdsinternal.SetLocalityID(a, testLocality)
}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(""),
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

got, err := xdsC.WaitForReportLoad(ctx)
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != "" {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, "")
}

// Update LRS server to a different name.
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: newString(testLRSServerName),
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
}
got2, err2 := xdsC.WaitForReportLoad(ctx)
if err2 != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err2)
}
if got2.Server != testLRSServerName {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerName)
}

// Update LRS server to nil, to disable LRS.
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServerName: nil,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
}

shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultShortTestTimeout)
defer shortCancel()
if s, err := xdsC.WaitForReportLoad(shortCtx); err != context.DeadlineExceeded {
t.Fatalf("unexpected load report to server: %q", s)
}
}

func assertString(f func() (string, error)) string {
Expand Down
50 changes: 40 additions & 10 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type clusterImplBalancer struct {
childLB balancer.Balancer
cancelLoadReport func()
edsServiceName string
lrsServerName string
lrsServerName *string
loadWrapper *loadstore.Wrapper

clusterNameMu sync.Mutex
Expand Down Expand Up @@ -165,22 +165,48 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
b.loadWrapper.UpdateClusterAndService(clusterName, b.edsServiceName)
}

var (
stopOldLoadReport bool
startNewLoadReport bool
)

// Check if it's necessary to restart load report.
var newLRSServerName string
if newConfig.LoadReportingServerName != nil {
newLRSServerName = *newConfig.LoadReportingServerName
}
if b.lrsServerName != newLRSServerName {
// LoadReportingServerName is different, load should be report to a
// different server, restart.
b.lrsServerName = newLRSServerName
if b.lrsServerName == nil {
if newConfig.LoadReportingServerName != nil {
// Old is nil, new is not nil, start new LRS.
b.lrsServerName = newConfig.LoadReportingServerName
startNewLoadReport = true
}
// Old is nil, new is nil, do nothing.
} else if newConfig.LoadReportingServerName == nil {
// Old is not nil, new is nil, stop old, don't start new.
b.lrsServerName = newConfig.LoadReportingServerName
stopOldLoadReport = true
} else {
// Old is not nil, new is not nil, compare string values, if
// different, stop old and start new.
if *b.lrsServerName != *newConfig.LoadReportingServerName {
b.lrsServerName = newConfig.LoadReportingServerName
stopOldLoadReport = true
startNewLoadReport = true
}
}

if stopOldLoadReport {
if b.cancelLoadReport != nil {
b.cancelLoadReport()
b.cancelLoadReport = nil
if !startNewLoadReport {
// If a new LRS stream will be started later, no need to update
// it to nil here.
b.loadWrapper.UpdateLoadStore(nil)
}
}
}
if startNewLoadReport {
var loadStore *load.Store
if b.xdsClient != nil {
loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(b.lrsServerName)
loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(*b.lrsServerName)
}
b.loadWrapper.UpdateLoadStore(loadStore)
}
Expand Down Expand Up @@ -492,6 +518,10 @@ func (b *clusterImplBalancer) run() {
}
b.mu.Unlock()
case <-b.closed.Done():
if b.cancelLoadReport != nil {
b.cancelLoadReport()
b.cancelLoadReport = nil
}
return
}
}
Expand Down
18 changes: 16 additions & 2 deletions xds/internal/testutils/fakeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Client struct {
cdsCancelCh *testutils.Channel
edsCancelCh *testutils.Channel
loadReportCh *testutils.Channel
lrsCancelCh *testutils.Channel
loadStore *load.Store
bootstrapCfg *bootstrap.Config

Expand Down Expand Up @@ -220,7 +221,16 @@ type ReportLoadArgs struct {
// ReportLoad starts reporting load about clusterName to server.
func (xdsC *Client) ReportLoad(server string) (loadStore *load.Store, cancel func()) {
xdsC.loadReportCh.Send(ReportLoadArgs{Server: server})
return xdsC.loadStore, func() {}
return xdsC.loadStore, func() {
xdsC.lrsCancelCh.Send(nil)
}
}

// WaitForCancelReportLoad waits for a load report to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelReportLoad(ctx context.Context) error {
_, err := xdsC.lrsCancelCh.Receive(ctx)
return err
}

// LoadStore returns the underlying load data store.
Expand All @@ -232,7 +242,10 @@ func (xdsC *Client) LoadStore() *load.Store {
// returns the arguments passed to it.
func (xdsC *Client) WaitForReportLoad(ctx context.Context) (ReportLoadArgs, error) {
val, err := xdsC.loadReportCh.Receive(ctx)
return val.(ReportLoadArgs), err
if err != nil {
return ReportLoadArgs{}, err
}
return val.(ReportLoadArgs), nil
}

// Close fires xdsC.Closed, indicating it was called.
Expand Down Expand Up @@ -275,6 +288,7 @@ func NewClientWithName(name string) *Client {
cdsCancelCh: testutils.NewChannelWithSize(10),
edsCancelCh: testutils.NewChannel(),
loadReportCh: testutils.NewChannel(),
lrsCancelCh: testutils.NewChannel(),
loadStore: load.NewStore(),
cdsCbs: make(map[string]func(xdsclient.ClusterUpdate, error)),
Closed: grpcsync.NewEvent(),
Expand Down

0 comments on commit b3f274c

Please sign in to comment.