Skip to content

Commit

Permalink
resolver: add State fields to support error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Aug 2, 2019
1 parent 92635fa commit 5cb2152
Show file tree
Hide file tree
Showing 20 changed files with 396 additions and 226 deletions.
13 changes: 11 additions & 2 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,23 @@ type ClientConnState struct {
BalancerConfig serviceconfig.LoadBalancingConfig
}

// ErrBadResolverState may be returned by UpdateClientConnState to indicate a
// problem with the provided name resolver data.
var ErrBadResolverState = errors.New("bad resolver state")

// V2Balancer is defined for documentation purposes. If a Balancer also
// implements V2Balancer, its UpdateClientConnState method will be called
// instead of HandleResolvedAddrs and its UpdateSubConnState will be called
// instead of HandleSubConnStateChange.
type V2Balancer interface {
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes.
UpdateClientConnState(ClientConnState)
// changes. If the error returned is ErrBadResolverState, the ClientConn
// will begin calling ResolveNow on the active name resolver with
// exponential backoff until a subsequent call to UpdateClientConnState
// returns a nil error. Any other errors are currently ignored.
UpdateClientConnState(ClientConnState) error
// ResolverError is called by gRPC when the name resolver reports an error.
ResolverError(error)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)
Expand Down
9 changes: 8 additions & 1 deletion balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func (bb *baseBuilder) Name() string {
return bb.name
}

var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer

type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder
Expand All @@ -70,7 +72,11 @@ func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
panic("not implemented")
}

func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
func (b *baseBalancer) ResolverError(error) {
// Ignore
}

func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// TODO: handle s.ResolverState.Err (log if not nil) once implemented.
// TODO: handle s.ResolverState.ServiceConfig?
if grpclog.V(2) {
Expand Down Expand Up @@ -101,6 +107,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
// The entry will be deleted in HandleSubConnStateChange.
}
}
return nil
}

// regeneratePicker takes a snapshot of the balancer, and generates a picker
Expand Down
18 changes: 13 additions & 5 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
return lb
}

var _ balancer.V2Balancer = (*lbBalancer)(nil) // Assert that we implement V2Balancer

type lbBalancer struct {
cc *lbCacheClientConn
target string
Expand Down Expand Up @@ -422,16 +424,21 @@ func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
}

func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
func (lb *lbBalancer) ResolverError(error) {
// Ignore resolver errors. GRPCLB is not selected unless the resolver
// works at least once.
}

func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: UpdateClientConnState: %+v", ccs)
}
gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
lb.handleServiceConfig(gc)

addrs := ccs.ResolverState.Addresses
if len(addrs) <= 0 {
return
if len(addrs) == 0 {
return balancer.ErrBadResolverState
}

var remoteBalancerAddrs, backendAddrs []resolver.Address
Expand All @@ -445,9 +452,9 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
}

if lb.ccRemoteLB == nil {
if len(remoteBalancerAddrs) <= 0 {
if len(remoteBalancerAddrs) == 0 {
grpclog.Errorf("grpclb: no remote balancer address is available, should never happen")
return
return balancer.ErrBadResolverState
}
// First time receiving resolved addresses, create a cc to remote
// balancers.
Expand All @@ -469,6 +476,7 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
}
lb.mu.Unlock()
return nil
}

func (lb *lbBalancer) Close() {
Expand Down
22 changes: 10 additions & 12 deletions balancer/grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
)
Expand Down Expand Up @@ -853,12 +852,6 @@ func TestFallback(t *testing.T) {
}

func TestGRPCLBPickFirst(t *testing.T) {
const pfc = `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`
svcCfg, err := serviceconfig.Parse(pfc)
if err != nil {
t.Fatalf("Error parsing config %q: %v", pfc, err)
}

defer leakcheck.Check(t)

r, cleanup := manual.GenerateAndRegisterManualResolver()
Expand Down Expand Up @@ -908,14 +901,19 @@ func TestGRPCLBPickFirst(t *testing.T) {
tss.ls.sls <- &lbpb.ServerList{Servers: beServers[0:3]}

// Start with sub policy pick_first.
const pfc = `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`
svcCfg := r.CC.ParseServiceConfig(pfc)
if _, err := svcCfg.Get(); err != nil {
t.Fatalf("Error parsing config %q: %v", pfc, err)
}

r.UpdateState(resolver.State{
Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}},
ServiceConfig: svcCfg,
ServiceConfigGetter: svcCfg,
})

result = ""
Expand Down Expand Up @@ -954,9 +952,9 @@ func TestGRPCLBPickFirst(t *testing.T) {
}

// Switch sub policy to roundrobin.
grpclbServiceConfigEmpty, err := serviceconfig.Parse(`{}`)
if err != nil {
t.Fatalf("Error parsing config %q: %v", grpclbServiceConfigEmpty, err)
grpclbServiceConfigEmpty := r.CC.ParseServiceConfig(`{}`)
if _, err := grpclbServiceConfigEmpty.Get(); err != nil {
t.Fatalf("Error parsing config %q: %v", `{}`, err)
}

r.UpdateState(resolver.State{
Expand All @@ -965,7 +963,7 @@ func TestGRPCLBPickFirst(t *testing.T) {
Type: resolver.GRPCLB,
ServerName: lbServerName,
}},
ServiceConfig: grpclbServiceConfigEmpty,
ServiceConfigGetter: grpclbServiceConfigEmpty,
})

result = ""
Expand Down
9 changes: 8 additions & 1 deletion balancer/xds/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ type edsBalancerInterface interface {
Close()
}

var _ balancer.V2Balancer = (*xdsBalancer)(nil) // Assert that we implement V2Balancer

// xdsBalancer manages xdsClient and the actual balancer that does load balancing (either edsBalancer,
// or fallback LB).
type xdsBalancer struct {
Expand Down Expand Up @@ -402,11 +404,16 @@ func (x *xdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Sub
}
}

func (x *xdsBalancer) UpdateClientConnState(s balancer.ClientConnState) {
func (x *xdsBalancer) ResolverError(error) {
// Ignore for now
}

func (x *xdsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
select {
case x.grpcUpdate <- &s:
case <-x.ctx.Done():
}
return nil
}

type cdsResp struct {
Expand Down
63 changes: 31 additions & 32 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -86,10 +87,10 @@ func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex
balancer balancer.Balancer
stateChangeQueue *scStateUpdateBuffer
ccUpdateCh chan *balancer.ClientConnState
done chan struct{}
done *grpcsync.Event

mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
Expand All @@ -99,8 +100,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
ccb := &ccBalancerWrapper{
cc: cc,
stateChangeQueue: newSCStateUpdateBuffer(),
ccUpdateCh: make(chan *balancer.ClientConnState, 1),
done: make(chan struct{}),
done: grpcsync.NewEvent(),
subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
Expand All @@ -115,34 +115,20 @@ func (ccb *ccBalancerWrapper) watcher() {
select {
case t := <-ccb.stateChangeQueue.get():
ccb.stateChangeQueue.load()
select {
case <-ccb.done:
ccb.balancer.Close()
return
default:
if ccb.done.HasFired() {
break
}
ccb.balancerMu.Lock()
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateSubConnState(t.sc, balancer.SubConnState{ConnectivityState: t.state})
} else {
ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
}
case s := <-ccb.ccUpdateCh:
select {
case <-ccb.done:
ccb.balancer.Close()
return
default:
}
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateClientConnState(*s)
} else {
ccb.balancer.HandleResolvedAddrs(s.ResolverState.Addresses, nil)
}
case <-ccb.done:
ccb.balancerMu.Unlock()
case <-ccb.done.Done():
}

select {
case <-ccb.done:
if ccb.done.HasFired() {
ccb.balancer.Close()
ccb.mu.Lock()
scs := ccb.subConns
Expand All @@ -153,14 +139,13 @@ func (ccb *ccBalancerWrapper) watcher() {
}
ccb.UpdateBalancerState(connectivity.Connecting, nil)
return
default:
}
ccb.cc.firstResolveEvent.Fire()
}
}

func (ccb *ccBalancerWrapper) close() {
close(ccb.done)
ccb.done.Fire()
}

func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
Expand All @@ -180,8 +165,11 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
})
}

func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) {
if ccb.cc.curBalancerName != grpclbName {
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
cbn := ccb.cc.curBalancerName
ccb.cc.mu.Unlock()
defer ccb.cc.mu.Lock()
if cbn != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
s := &ccs.ResolverState
for i := 0; i < len(s.Addresses); {
Expand All @@ -193,11 +181,22 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
i++
}
}
select {
case <-ccb.ccUpdateCh:
default:

ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
return ub.UpdateClientConnState(*ccs)
}
ccb.balancer.HandleResolvedAddrs(ccs.ResolverState.Addresses, nil)
return nil
}

func (ccb *ccBalancerWrapper) resolverError(err error) {
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ccb.balancerMu.Lock()
ub.ResolverError(err)
ccb.balancerMu.Unlock()
}
ccb.ccUpdateCh <- ccs
}

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
Expand Down

0 comments on commit 5cb2152

Please sign in to comment.