Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement missing pieces for connection backoff. #2821

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions backoff.go
Expand Up @@ -27,12 +27,37 @@ import (

// DefaultBackoffConfig uses values specified for backoff in
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
//
// Deprecated: use ConnectParams instead.
var DefaultBackoffConfig = BackoffConfig{
MaxDelay: 120 * time.Second,
}

// BackoffConfig defines the parameters for the default gRPC backoff strategy.
//
// Deprecated: use ConnectParams instead.
type BackoffConfig struct {
// MaxDelay is the upper bound of backoff delay.
MaxDelay time.Duration
}

// ConnectParams defines the parameters for connecting and retrying. Users
// are encouraged to use this instead of BackoffConfig.
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
//
// This API is EXPERIMENTAL.
type ConnectParams struct {
// BackoffBaseDelay is the amount of time to backoff after the first
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we replace all the backoff fields here with a single BackoffConfig field, and add these fields to the BackoffConfig (excluding MinConnectTimeout)? Theoretically we could pass BackoffConfig to Exponential directly, though that is unfortunately not easy because it's in a different package and has circular dependency issues. We'd need a new, non-internal backoff package. This may actually be a good idea.

// connection failure.
BackoffBaseDelay time.Duration
// BackoffMultiplier is the factor with which to multiply backoffs after a
// failed retry. Should ideally be greater than 1.
BackoffMultiplier float64
// BackoffJitter is the factor with which backoffs are randomized.
BackoffJitter float64
// BackoffMaxDelay is the upper bound of backoff delay.
BackoffMaxDelay time.Duration
// MinConnectTimeout is the minimum amount of time we are willing to give a
// connection to complete.
MinConnectTimeout time.Duration
}
8 changes: 3 additions & 5 deletions balancer/grpclb/grpclb.go
Expand Up @@ -50,16 +50,14 @@ const (
)

var (
// defaultBackoffConfig configures the backoff strategy that's used when the
// defaultBackoffStrategy configures the backoff strategy that's used when the
// init handshake in the RPC is unsuccessful. It's not for the clientconn
// reconnect backoff.
//
// It has the same value as the default grpc.DefaultBackoffConfig.
//
// TODO: make backoff configurable.
defaultBackoffConfig = backoff.Exponential{
MaxDelay: 120 * time.Second,
}
defaultBackoffStrategy = backoff.NewExponentialBuilder().Build()
errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
)

Expand Down Expand Up @@ -155,7 +153,7 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
scStates: make(map[balancer.SubConn]connectivity.State),
picker: &errPicker{err: balancer.ErrNoSubConnAvailable},
clientStats: newRPCStats(),
backoff: defaultBackoffConfig, // TODO: make backoff configurable.
backoff: defaultBackoffStrategy, // TODO: make backoff configurable.
}

var err error
Expand Down
29 changes: 29 additions & 0 deletions balancer/weightedroundrobin/weightedroundrobin.go
@@ -0,0 +1,29 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package weightedroundrobin defines a weighted roundrobin balancer.
package weightedroundrobin

// Name is the name of weighted_round_robin balancer.
const Name = "weighted_round_robin"

// AddrInfo will be stored inside Address metadata in order to use weighted roundrobin
// balancer.
type AddrInfo struct {
Weight uint32
}
12 changes: 10 additions & 2 deletions balancer/xds/edsbalancer/edsbalancer.go
Expand Up @@ -27,6 +27,7 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/balancer/xds/internal"
edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
endpointpb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/endpoint/endpoint"
Expand Down Expand Up @@ -227,9 +228,16 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *edspb.ClusterLoadAssignment)
var newAddrs []resolver.Address
for _, lbEndpoint := range locality.GetLbEndpoints() {
socketAddress := lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()
newAddrs = append(newAddrs, resolver.Address{
address := resolver.Address{
Addr: net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue()))),
})
}
if xdsB.subBalancerBuilder.Name() == weightedroundrobin.Name &&
lbEndpoint.GetLoadBalancingWeight().GetValue() != 0 {
address.Metadata = &weightedroundrobin.AddrInfo{
Weight: lbEndpoint.GetLoadBalancingWeight().GetValue(),
}
}
newAddrs = append(newAddrs, address)
}
var weightChanged, addrsChanged bool
config, ok := xdsB.lidToConfig[lid]
Expand Down
4 changes: 1 addition & 3 deletions balancer/xds/lrs/lrs.go
Expand Up @@ -161,9 +161,7 @@ func NewStore(serviceName string) Store {
},
},
},
backoff: backoff.Exponential{
MaxDelay: 120 * time.Second,
},
backoff: backoff.NewExponentialBuilder().Build(),
lastReported: time.Now(),
}
}
Expand Down
98 changes: 31 additions & 67 deletions balancer/xds/xds_client.go
Expand Up @@ -23,42 +23,34 @@ import (
"sync"
"time"

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
structpb "github.com/golang/protobuf/ptypes/struct"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
xdscorepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
xdsdiscoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/xds/internal"
cdspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/cds"
basepb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base"
discoverypb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/discovery"
edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
adsgrpc "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/discovery/v2/ads"
"google.golang.org/grpc/balancer/xds/lrs"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
)

const (
grpcHostname = "com.googleapis.trafficdirector.grpc_hostname"
cdsType = "type.googleapis.com/envoy.api.v2.Cluster"
edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
endpointRequired = "endpoints_required"
)

var (
defaultBackoffConfig = backoff.Exponential{
MaxDelay: 120 * time.Second,
}
)
var defaultBackoffStrategy = backoff.NewExponentialBuilder().Build()

// client is responsible for connecting to the specified traffic director, passing the received
// ADS response from the traffic director, and sending notification when communication with the
// traffic director is lost.
type client struct {
ctx context.Context
cancel context.CancelFunc
cli adsgrpc.AggregatedDiscoveryServiceClient
cli xdsdiscoverypb.AggregatedDiscoveryServiceClient
opts balancer.BuildOptions
balancerName string // the traffic director name
serviceName string // the user dial target name
Expand All @@ -68,9 +60,6 @@ type client struct {
cleanup func()
backoff backoff.Strategy

loadStore lrs.Store
loadReportOnce sync.Once

mu sync.Mutex
cc *grpc.ClientConn
}
Expand Down Expand Up @@ -128,13 +117,13 @@ func (c *client) dial() {
c.mu.Unlock()
}

func (c *client) newCDSRequest() *discoverypb.DiscoveryRequest {
cdsReq := &discoverypb.DiscoveryRequest{
Node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: c.serviceName},
func (c *client) newCDSRequest() *xdspb.DiscoveryRequest {
cdsReq := &xdspb.DiscoveryRequest{
Node: &xdscorepb.Node{
Metadata: &types.Struct{
Fields: map[string]*types.Value{
grpcHostname: {
Kind: &types.Value_StringValue{StringValue: c.serviceName},
},
},
},
Expand All @@ -144,37 +133,25 @@ func (c *client) newCDSRequest() *discoverypb.DiscoveryRequest {
return cdsReq
}

func (c *client) newEDSRequest() *discoverypb.DiscoveryRequest {
edsReq := &discoverypb.DiscoveryRequest{
Node: &basepb.Node{
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
internal.GrpcHostname: {
Kind: &structpb.Value_StringValue{StringValue: c.serviceName},
},
func (c *client) newEDSRequest() *xdspb.DiscoveryRequest {
edsReq := &xdspb.DiscoveryRequest{
Node: &xdscorepb.Node{
Metadata: &types.Struct{
Fields: map[string]*types.Value{
endpointRequired: {
Kind: &structpb.Value_BoolValue{BoolValue: c.enableCDS},
Kind: &types.Value_BoolValue{BoolValue: c.enableCDS},
},
},
},
},
// TODO: the expected ResourceName could be in a different format from
// dial target. (test_service.test_namespace.traffic_director.com vs
// test_namespace:test_service).
//
// The solution today is to always include GrpcHostname in metadata,
// with the value set to dial target.
//
// A future solution could be: always do CDS, get cluster name from CDS
// response, and use it here.
// `ResourceNames: []string{c.clusterName},`
TypeUrl: edsType,
ResourceNames: []string{c.serviceName},
TypeUrl: edsType,
}
return edsReq
}

func (c *client) makeADSCall() {
c.cli = adsgrpc.NewAggregatedDiscoveryServiceClient(c.cc)
c.cli = xdsdiscoverypb.NewAggregatedDiscoveryServiceClient(c.cc)
retryCount := 0
var doRetry bool

Expand Down Expand Up @@ -219,21 +196,18 @@ func (c *client) adsCallAttempt() (firstRespReceived bool) {
if c.enableCDS {
if err := st.Send(c.newCDSRequest()); err != nil {
// current stream is broken, start a new one.
grpclog.Infof("xds: ads RPC failed due to err: %v, when sending the CDS request ", err)
return
}
}
if err := st.Send(c.newEDSRequest()); err != nil {
// current stream is broken, start a new one.
grpclog.Infof("xds: ads RPC failed due to err: %v, when sending the EDS request", err)
return
}
expectCDS := c.enableCDS
for {
resp, err := st.Recv()
if err != nil {
// current stream is broken, start a new one.
grpclog.Infof("xds: ads RPC failed due to err: %v, when receiving the response", err)
return
}
firstRespReceived = true
Expand All @@ -248,15 +222,15 @@ func (c *client) adsCallAttempt() (firstRespReceived bool) {
// start a new call as we receive CDS response when in EDS-only mode.
return
}
var adsResp ptypes.DynamicAny
if err := ptypes.UnmarshalAny(resources[0], &adsResp); err != nil {
var adsResp types.DynamicAny
if err := types.UnmarshalAny(&resources[0], &adsResp); err != nil {
grpclog.Warningf("xds: failed to unmarshal resources due to %v.", err)
return
}
switch adsResp.Message.(type) {
case *cdspb.Cluster:
case *xdspb.Cluster:
expectCDS = false
case *edspb.ClusterLoadAssignment:
case *xdspb.ClusterLoadAssignment:
if expectCDS {
grpclog.Warningf("xds: expecting CDS response, got EDS response instead.")
return
Expand All @@ -266,28 +240,18 @@ func (c *client) adsCallAttempt() (firstRespReceived bool) {
grpclog.Warningf("xds: processing new ADS message failed due to %v.", err)
return
}
// Only start load reporting after ADS resp is received.
//
// Also, newADS() will close the previous load reporting stream, so we
// don't have double reporting.
c.loadReportOnce.Do(func() {
if c.loadStore != nil {
go c.loadStore.ReportTo(c.ctx, c.cc)
}
})
}
}

func newXDSClient(balancerName string, enableCDS bool, opts balancer.BuildOptions, loadStore lrs.Store, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
func newXDSClient(balancerName string, serviceName string, enableCDS bool, opts balancer.BuildOptions, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
c := &client{
balancerName: balancerName,
serviceName: opts.Target.Endpoint,
serviceName: serviceName,
enableCDS: enableCDS,
opts: opts,
newADS: newADS,
loseContact: loseContact,
cleanup: exitCleanup,
backoff: defaultBackoffConfig,
backoff: defaultBackoffStrategy,
loadStore: loadStore,
}

Expand Down
8 changes: 2 additions & 6 deletions clientconn.go
Expand Up @@ -49,8 +49,6 @@ import (
)

const (
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
// must match grpclbName in grpclb/grpclb.go
grpclbName = "grpclb"
)
Expand Down Expand Up @@ -235,9 +233,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}
if cc.dopts.bs == nil {
cc.dopts.bs = backoff.Exponential{
MaxDelay: DefaultBackoffConfig.MaxDelay,
}
cc.dopts.bs = backoff.NewExponentialBuilder().Build()
}
if cc.dopts.resolverBuilder == nil {
// Only try to parse target when resolver builder is not already set.
Expand Down Expand Up @@ -1007,7 +1003,7 @@ func (ac *addrConn) resetTransport() {
addrs := ac.addrs
backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
// This will be the duration that dial gets to finish.
dialDuration := minConnectTimeout
dialDuration := ac.dopts.mct
if ac.dopts.minConnectTimeout != nil {
dialDuration = ac.dopts.minConnectTimeout()
}
Expand Down