Skip to content

Commit

Permalink
resolver: add State fields to support error handling (#2951)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Oct 4, 2019
1 parent aa4eae6 commit ed563a0
Show file tree
Hide file tree
Showing 24 changed files with 659 additions and 268 deletions.
13 changes: 11 additions & 2 deletions balancer/balancer.go
Expand Up @@ -305,14 +305,23 @@ type ClientConnState struct {
BalancerConfig serviceconfig.LoadBalancingConfig
}

// ErrBadResolverState may be returned by UpdateClientConnState to indicate a
// problem with the provided name resolver data.
var ErrBadResolverState = errors.New("bad resolver state")

// V2Balancer is defined for documentation purposes. If a Balancer also
// implements V2Balancer, its UpdateClientConnState method will be called
// instead of HandleResolvedAddrs and its UpdateSubConnState will be called
// instead of HandleSubConnStateChange.
type V2Balancer interface {
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes.
UpdateClientConnState(ClientConnState)
// changes. If the error returned is ErrBadResolverState, the ClientConn
// will begin calling ResolveNow on the active name resolver with
// exponential backoff until a subsequent call to UpdateClientConnState
// returns a nil error. Any other errors are currently ignored.
UpdateClientConnState(ClientConnState) error
// ResolverError is called by gRPC when the name resolver reports an error.
ResolverError(error)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)
Expand Down
9 changes: 8 additions & 1 deletion balancer/base/balancer.go
Expand Up @@ -53,6 +53,8 @@ func (bb *baseBuilder) Name() string {
return bb.name
}

var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer

type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder
Expand All @@ -70,7 +72,11 @@ func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
panic("not implemented")
}

func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
func (b *baseBalancer) ResolverError(error) {
// Ignore
}

func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// TODO: handle s.ResolverState.Err (log if not nil) once implemented.
// TODO: handle s.ResolverState.ServiceConfig?
if grpclog.V(2) {
Expand Down Expand Up @@ -101,6 +107,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
// The entry will be deleted in HandleSubConnStateChange.
}
}
return nil
}

// regeneratePicker takes a snapshot of the balancer, and generates a picker
Expand Down
18 changes: 13 additions & 5 deletions balancer/grpclb/grpclb.go
Expand Up @@ -161,6 +161,8 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
return lb
}

var _ balancer.V2Balancer = (*lbBalancer)(nil) // Assert that we implement V2Balancer

type lbBalancer struct {
cc *lbCacheClientConn
target string
Expand Down Expand Up @@ -410,16 +412,21 @@ func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
}

func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
func (lb *lbBalancer) ResolverError(error) {
// Ignore resolver errors. GRPCLB is not selected unless the resolver
// works at least once.
}

func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: UpdateClientConnState: %+v", ccs)
}
gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
lb.handleServiceConfig(gc)

addrs := ccs.ResolverState.Addresses
if len(addrs) <= 0 {
return
if len(addrs) == 0 {
return balancer.ErrBadResolverState
}

var remoteBalancerAddrs, backendAddrs []resolver.Address
Expand All @@ -433,9 +440,9 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
}

if lb.ccRemoteLB == nil {
if len(remoteBalancerAddrs) <= 0 {
if len(remoteBalancerAddrs) == 0 {
grpclog.Errorf("grpclb: no remote balancer address is available, should never happen")
return
return balancer.ErrBadResolverState
}
// First time receiving resolved addresses, create a cc to remote
// balancers.
Expand All @@ -457,6 +464,7 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
}
lb.mu.Unlock()
return nil
}

func (lb *lbBalancer) Close() {
Expand Down
20 changes: 9 additions & 11 deletions balancer/grpclb/grpclb_test.go
Expand Up @@ -44,7 +44,6 @@ import (
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
)
Expand Down Expand Up @@ -853,12 +852,6 @@ func TestFallback(t *testing.T) {
}

func TestGRPCLBPickFirst(t *testing.T) {
const pfc = `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`
svcCfg, err := serviceconfig.Parse(pfc)
if err != nil {
t.Fatalf("Error parsing config %q: %v", pfc, err)
}

defer leakcheck.Check(t)

r, cleanup := manual.GenerateAndRegisterManualResolver()
Expand Down Expand Up @@ -908,14 +901,19 @@ func TestGRPCLBPickFirst(t *testing.T) {
tss.ls.sls <- &lbpb.ServerList{Servers: beServers[0:3]}

// Start with sub policy pick_first.
const pfc = `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`
scpr := r.CC.ParseServiceConfig(pfc)
if scpr.Err != nil {
t.Fatalf("Error parsing config %q: %v", pfc, scpr.Err)
}

r.UpdateState(resolver.State{
Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}},
ServiceConfig: svcCfg,
ServiceConfig: scpr,
})

result = ""
Expand Down Expand Up @@ -954,9 +952,9 @@ func TestGRPCLBPickFirst(t *testing.T) {
}

// Switch sub policy to roundrobin.
grpclbServiceConfigEmpty, err := serviceconfig.Parse(`{}`)
if err != nil {
t.Fatalf("Error parsing config %q: %v", grpclbServiceConfigEmpty, err)
grpclbServiceConfigEmpty := r.CC.ParseServiceConfig(`{}`)
if grpclbServiceConfigEmpty.Err != nil {
t.Fatalf("Error parsing config %q: %v", `{}`, grpclbServiceConfigEmpty.Err)
}

r.UpdateState(resolver.State{
Expand Down
68 changes: 25 additions & 43 deletions balancer_conn_wrappers.go
Expand Up @@ -25,6 +25,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -86,10 +87,10 @@ func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
stateChangeQueue *scStateUpdateBuffer
ccUpdateCh chan *balancer.ClientConnState
done chan struct{}
done *grpcsync.Event

mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
Expand All @@ -99,8 +100,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
ccb := &ccBalancerWrapper{
cc: cc,
stateChangeQueue: newSCStateUpdateBuffer(),
ccUpdateCh: make(chan *balancer.ClientConnState, 1),
done: make(chan struct{}),
done: grpcsync.NewEvent(),
subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
Expand All @@ -115,34 +115,20 @@ func (ccb *ccBalancerWrapper) watcher() {
select {
case t := <-ccb.stateChangeQueue.get():
ccb.stateChangeQueue.load()
select {
case <-ccb.done:
ccb.balancer.Close()
return
default:
if ccb.done.HasFired() {
break
}
ccb.balancerMu.Lock()
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateSubConnState(t.sc, balancer.SubConnState{ConnectivityState: t.state})
} else {
ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
}
case s := <-ccb.ccUpdateCh:
select {
case <-ccb.done:
ccb.balancer.Close()
return
default:
}
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateClientConnState(*s)
} else {
ccb.balancer.HandleResolvedAddrs(s.ResolverState.Addresses, nil)
}
case <-ccb.done:
ccb.balancerMu.Unlock()
case <-ccb.done.Done():
}

select {
case <-ccb.done:
if ccb.done.HasFired() {
ccb.balancer.Close()
ccb.mu.Lock()
scs := ccb.subConns
Expand All @@ -153,14 +139,12 @@ func (ccb *ccBalancerWrapper) watcher() {
}
ccb.UpdateBalancerState(connectivity.Connecting, nil)
return
default:
}
ccb.cc.firstResolveEvent.Fire()
}
}

func (ccb *ccBalancerWrapper) close() {
close(ccb.done)
ccb.done.Fire()
}

func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
Expand All @@ -180,24 +164,22 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
})
}

func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) {
if ccb.cc.curBalancerName != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
s := &ccs.ResolverState
for i := 0; i < len(s.Addresses); {
if s.Addresses[i].Type == resolver.GRPCLB {
copy(s.Addresses[i:], s.Addresses[i+1:])
s.Addresses = s.Addresses[:len(s.Addresses)-1]
continue
}
i++
}
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
return ub.UpdateClientConnState(*ccs)
}
select {
case <-ccb.ccUpdateCh:
default:
ccb.balancer.HandleResolvedAddrs(ccs.ResolverState.Addresses, nil)
return nil
}

func (ccb *ccBalancerWrapper) resolverError(err error) {
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ccb.balancerMu.Lock()
ub.ResolverError(err)
ccb.balancerMu.Unlock()
}
ccb.ccUpdateCh <- ccs
}

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
Expand Down
92 changes: 92 additions & 0 deletions balancer_conn_wrappers_test.go
@@ -0,0 +1,92 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package grpc

import (
"fmt"
"testing"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
)

var _ balancer.V2Balancer = &funcBalancer{}

type funcBalancer struct {
updateClientConnState func(s balancer.ClientConnState) error
}

func (*funcBalancer) HandleSubConnStateChange(balancer.SubConn, connectivity.State) {
panic("unimplemented") // v1 API
}
func (*funcBalancer) HandleResolvedAddrs([]resolver.Address, error) {
panic("unimplemented") // v1 API
}
func (b *funcBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
return b.updateClientConnState(s)
}
func (*funcBalancer) ResolverError(error) {
panic("unimplemented") // resolver never reports error
}
func (*funcBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
panic("unimplemented") // we never have sub-conns
}
func (*funcBalancer) Close() {}

type funcBalancerBuilder struct {
name string
instance *funcBalancer
}

func (b *funcBalancerBuilder) Build(balancer.ClientConn, balancer.BuildOptions) balancer.Balancer {
return b.instance
}
func (b *funcBalancerBuilder) Name() string { return b.name }

// TestBalancerErrorResolverPolling injects balancer errors and verifies
// ResolveNow is called on the resolver with the appropriate backoff strategy
// being consulted between ResolveNow calls.
func (s) TestBalancerErrorResolverPolling(t *testing.T) {
// The test balancer will return ErrBadResolverState iff the
// ClientConnState contains no addresses.
fb := &funcBalancer{
updateClientConnState: func(s balancer.ClientConnState) error {
if len(s.ResolverState.Addresses) == 0 {
return balancer.ErrBadResolverState
}
return nil
},
}
const balName = "BalancerErrorResolverPolling"
balancer.Register(&funcBalancerBuilder{name: balName, instance: fb})

testResolverErrorPolling(t,
func(r *manual.Resolver) {
// No addresses so the balancer will fail.
r.CC.UpdateState(resolver.State{})
}, func(r *manual.Resolver) {
// UpdateState will block if ResolveNow is being called (which blocks on
// rn), so call it in a goroutine. Include some address so the balancer
// will be happy.
go r.CC.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "x"}}})
},
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, balName)))
}

0 comments on commit ed563a0

Please sign in to comment.