Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: always backoff between new streams even after successful stream #5280

Merged
merged 3 commits into from Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/authority.go
Expand Up @@ -108,7 +108,7 @@ func (c *clientImpl) newAuthority(config *bootstrap.ServerConfig) (_ *authority,
ret.close()
}
}()
ctr, err := newController(config, ret.pubsub, c.updateValidator, c.logger)
ctr, err := newController(config, ret.pubsub, c.updateValidator, c.logger, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/client_test.go
Expand Up @@ -102,7 +102,7 @@ type testController struct {
func overrideNewController(t *testing.T) *testutils.Channel {
origNewController := newController
ch := testutils.NewChannel()
newController = func(config *bootstrap.ServerConfig, pubsub *pubsub.Pubsub, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger) (controllerInterface, error) {
newController = func(config *bootstrap.ServerConfig, pubsub *pubsub.Pubsub, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger, _ func(int) time.Duration) (controllerInterface, error) {
ret := newTestController(config)
ch.Send(ret)
return ret, nil
Expand Down
6 changes: 4 additions & 2 deletions xds/internal/xdsclient/controller.go
Expand Up @@ -18,6 +18,8 @@
package xdsclient

import (
"time"

"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/controller"
Expand All @@ -33,6 +35,6 @@ type controllerInterface interface {
Close()
}

var newController = func(config *bootstrap.ServerConfig, pubsub *pubsub.Pubsub, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger) (controllerInterface, error) {
return controller.New(config, pubsub, validator, logger)
var newController = func(config *bootstrap.ServerConfig, pubsub *pubsub.Pubsub, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger, boff func(int) time.Duration) (controllerInterface, error) {
return controller.New(config, pubsub, validator, logger, boff)
}
7 changes: 5 additions & 2 deletions xds/internal/xdsclient/controller/controller.go
Expand Up @@ -100,7 +100,7 @@ func SetGRPCDial(dialer func(target string, opts ...grpc.DialOption) (*grpc.Clie
}

// New creates a new controller.
func New(config *bootstrap.ServerConfig, updateHandler pubsub.UpdateHandler, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger) (_ *Controller, retErr error) {
func New(config *bootstrap.ServerConfig, updateHandler pubsub.UpdateHandler, validator xdsresource.UpdateValidatorFunc, logger *grpclog.PrefixLogger, boff func(int) time.Duration) (_ *Controller, retErr error) {
switch {
case config == nil:
return nil, errors.New("xds: no xds_server provided")
Expand All @@ -120,12 +120,15 @@ func New(config *bootstrap.ServerConfig, updateHandler pubsub.UpdateHandler, val
}),
}

if boff == nil {
boff = backoff.DefaultExponential.Backoff
}
ret := &Controller{
config: config,
updateValidator: validator,
updateHandler: updateHandler,

backoff: backoff.DefaultExponential.Backoff, // TODO: should this be configurable?
backoff: boff,
streamCh: make(chan grpc.ClientStream, 1),
sendCh: buffer.NewUnbounded(),
watchMap: make(map[xdsresource.ResourceType]map[string]bool),
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/xdsclient/controller/controller_test.go
Expand Up @@ -95,7 +95,7 @@ func (s) TestNew(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c, err := New(test.config, noopUpdateHandler, nil, nil) // Only testing the config, other inputs are left as nil.
c, err := New(test.config, noopUpdateHandler, nil, nil, nil) // Only testing the config, other inputs are left as nil.
defer func() {
if c != nil {
c.Close()
Expand Down Expand Up @@ -123,7 +123,7 @@ func (s) TestNewWithGRPCDial(t *testing.T) {

// Set the dialer and make sure it is called.
SetGRPCDial(customDialer)
c, err := New(config, noopUpdateHandler, nil, nil)
c, err := New(config, noopUpdateHandler, nil, nil, nil)
if err != nil {
t.Fatalf("New(%+v) = %v, want no error", config, err)
}
Expand All @@ -138,7 +138,7 @@ func (s) TestNewWithGRPCDial(t *testing.T) {

// Reset the dialer and make sure it is not called.
SetGRPCDial(grpc.Dial)
c, err = New(config, noopUpdateHandler, nil, nil)
c, err = New(config, noopUpdateHandler, nil, nil, nil)
defer func() {
if c != nil {
c.Close()
Expand Down
36 changes: 14 additions & 22 deletions xds/internal/xdsclient/controller/transport.go
Expand Up @@ -59,26 +59,21 @@ func (t *Controller) run(ctx context.Context) {
// report error (and log) when stats is transient failure.

retries := 0
for {
select {
case <-ctx.Done():
return
default:
}

if retries != 0 {
timer := time.NewTimer(t.backoff(retries))
lastStreamStartTime := time.Time{}
for ctx.Err() == nil {
dur := time.Until(lastStreamStartTime.Add(t.backoff(retries)))
if dur > 0 {
timer := time.NewTimer(dur)
select {
case <-timer.C:
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
timer.Stop()
return
}
}

retries++
lastStreamStartTime = time.Now()
stream, err := t.vClient.NewStream(ctx, t.cc)
if err != nil {
t.updateHandler.NewConnectionError(err)
Expand Down Expand Up @@ -370,24 +365,21 @@ func (t *Controller) processAckInfo(ack *ackAction, stream grpc.ClientStream) (t
// It blocks until the context is cancelled.
func (t *Controller) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts controllerversion.LoadReportingOptions) {
retries := 0
for {
if ctx.Err() != nil {
return
}

if retries != 0 {
timer := time.NewTimer(t.backoff(retries))
lastStreamStartTime := time.Time{}
for ctx.Err() == nil {
dur := time.Until(lastStreamStartTime.Add(t.backoff(retries)))
if dur > 0 {
timer := time.NewTimer(dur)
select {
case <-timer.C:
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
timer.Stop()
return
}
}

retries++
lastStreamStartTime = time.Now()
stream, err := t.vClient.NewLoadStatsStream(ctx, cc)
if err != nil {
t.logger.Warningf("lrs: failed to create stream: %v", err)
Expand Down
5 changes: 1 addition & 4 deletions xds/internal/xdsclient/controller/v2_testutils_test.go
Expand Up @@ -458,13 +458,10 @@ func newTestController(p pubsub.UpdateHandler, controlPlanAddr string, n *basepb
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
TransportAPI: version.TransportV2,
NodeProto: n,
}, p, nil, l)
}, p, nil, l, b)
if err != nil {
return nil, err
}
// This direct setting backoff seems a bit hacky, but should be OK for the
// tests. Or we need to make it configurable in New().
c.backoff = b
return c, nil
}

Expand Down