Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resolver: add State fields to support error handling #2951

Merged
merged 14 commits into from Oct 4, 2019
13 changes: 11 additions & 2 deletions balancer/balancer.go
Expand Up @@ -305,14 +305,23 @@ type ClientConnState struct {
BalancerConfig serviceconfig.LoadBalancingConfig
}

// ErrBadResolverState may be returned by UpdateClientConnState to indicate a
// problem with the provided name resolver data.
var ErrBadResolverState = errors.New("bad resolver state")

// V2Balancer is defined for documentation purposes. If a Balancer also
// implements V2Balancer, its UpdateClientConnState method will be called
// instead of HandleResolvedAddrs and its UpdateSubConnState will be called
// instead of HandleSubConnStateChange.
type V2Balancer interface {
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes.
UpdateClientConnState(ClientConnState)
// changes. If the error returned is ErrBadResolverState, the ClientConn
// will begin calling ResolveNow on the active name resolver with
// exponential backoff until a subsequent call to UpdateClientConnState
// returns a nil error. Any other errors are currently ignored.
UpdateClientConnState(ClientConnState) error
// ResolverError is called by gRPC when the name resolver reports an error.
ResolverError(error)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)
Expand Down
9 changes: 8 additions & 1 deletion balancer/base/balancer.go
Expand Up @@ -53,6 +53,8 @@ func (bb *baseBuilder) Name() string {
return bb.name
}

var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer

type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder
Expand All @@ -70,7 +72,11 @@ func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
panic("not implemented")
}

func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
func (b *baseBalancer) ResolverError(error) {
// Ignore
}

func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// TODO: handle s.ResolverState.Err (log if not nil) once implemented.
// TODO: handle s.ResolverState.ServiceConfig?
if grpclog.V(2) {
Expand Down Expand Up @@ -101,6 +107,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
// The entry will be deleted in HandleSubConnStateChange.
}
}
return nil
}

// regeneratePicker takes a snapshot of the balancer, and generates a picker
Expand Down
18 changes: 13 additions & 5 deletions balancer/grpclb/grpclb.go
Expand Up @@ -161,6 +161,8 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
return lb
}

var _ balancer.V2Balancer = (*lbBalancer)(nil) // Assert that we implement V2Balancer

type lbBalancer struct {
cc *lbCacheClientConn
target string
Expand Down Expand Up @@ -410,16 +412,21 @@ func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
}

func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
func (lb *lbBalancer) ResolverError(error) {
// Ignore resolver errors. GRPCLB is not selected unless the resolver
// works at least once.
}

func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: UpdateClientConnState: %+v", ccs)
}
gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
lb.handleServiceConfig(gc)

addrs := ccs.ResolverState.Addresses
if len(addrs) <= 0 {
return
if len(addrs) == 0 {
return balancer.ErrBadResolverState
}

var remoteBalancerAddrs, backendAddrs []resolver.Address
Expand All @@ -433,9 +440,9 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
}

if lb.ccRemoteLB == nil {
if len(remoteBalancerAddrs) <= 0 {
if len(remoteBalancerAddrs) == 0 {
grpclog.Errorf("grpclb: no remote balancer address is available, should never happen")
return
return balancer.ErrBadResolverState
}
// First time receiving resolved addresses, create a cc to remote
// balancers.
Expand All @@ -457,6 +464,7 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
}
lb.mu.Unlock()
return nil
}

func (lb *lbBalancer) Close() {
Expand Down
20 changes: 9 additions & 11 deletions balancer/grpclb/grpclb_test.go
Expand Up @@ -44,7 +44,6 @@ import (
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
)
Expand Down Expand Up @@ -853,12 +852,6 @@ func TestFallback(t *testing.T) {
}

func TestGRPCLBPickFirst(t *testing.T) {
const pfc = `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`
svcCfg, err := serviceconfig.Parse(pfc)
if err != nil {
t.Fatalf("Error parsing config %q: %v", pfc, err)
}

defer leakcheck.Check(t)

r, cleanup := manual.GenerateAndRegisterManualResolver()
Expand Down Expand Up @@ -908,14 +901,19 @@ func TestGRPCLBPickFirst(t *testing.T) {
tss.ls.sls <- &lbpb.ServerList{Servers: beServers[0:3]}

// Start with sub policy pick_first.
const pfc = `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`
scpr := r.CC.ParseServiceConfig(pfc)
if scpr.Err != nil {
t.Fatalf("Error parsing config %q: %v", pfc, scpr.Err)
}

r.UpdateState(resolver.State{
Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}},
ServiceConfig: svcCfg,
ServiceConfig: scpr,
})

result = ""
Expand Down Expand Up @@ -954,9 +952,9 @@ func TestGRPCLBPickFirst(t *testing.T) {
}

// Switch sub policy to roundrobin.
grpclbServiceConfigEmpty, err := serviceconfig.Parse(`{}`)
if err != nil {
t.Fatalf("Error parsing config %q: %v", grpclbServiceConfigEmpty, err)
grpclbServiceConfigEmpty := r.CC.ParseServiceConfig(`{}`)
if grpclbServiceConfigEmpty.Err != nil {
t.Fatalf("Error parsing config %q: %v", `{}`, grpclbServiceConfigEmpty.Err)
}

r.UpdateState(resolver.State{
Expand Down
68 changes: 25 additions & 43 deletions balancer_conn_wrappers.go
Expand Up @@ -25,6 +25,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -86,10 +87,10 @@ func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
stateChangeQueue *scStateUpdateBuffer
ccUpdateCh chan *balancer.ClientConnState
done chan struct{}
done *grpcsync.Event

mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
Expand All @@ -99,8 +100,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
ccb := &ccBalancerWrapper{
cc: cc,
stateChangeQueue: newSCStateUpdateBuffer(),
ccUpdateCh: make(chan *balancer.ClientConnState, 1),
done: make(chan struct{}),
done: grpcsync.NewEvent(),
subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
Expand All @@ -115,34 +115,20 @@ func (ccb *ccBalancerWrapper) watcher() {
select {
case t := <-ccb.stateChangeQueue.get():
ccb.stateChangeQueue.load()
select {
case <-ccb.done:
ccb.balancer.Close()
return
default:
if ccb.done.HasFired() {
break
}
ccb.balancerMu.Lock()
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateSubConnState(t.sc, balancer.SubConnState{ConnectivityState: t.state})
} else {
ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
}
case s := <-ccb.ccUpdateCh:
select {
case <-ccb.done:
ccb.balancer.Close()
return
default:
}
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateClientConnState(*s)
} else {
ccb.balancer.HandleResolvedAddrs(s.ResolverState.Addresses, nil)
}
case <-ccb.done:
ccb.balancerMu.Unlock()
case <-ccb.done.Done():
}

select {
case <-ccb.done:
if ccb.done.HasFired() {
ccb.balancer.Close()
ccb.mu.Lock()
scs := ccb.subConns
Expand All @@ -153,14 +139,12 @@ func (ccb *ccBalancerWrapper) watcher() {
}
ccb.UpdateBalancerState(connectivity.Connecting, nil)
return
default:
}
ccb.cc.firstResolveEvent.Fire()
}
}

func (ccb *ccBalancerWrapper) close() {
close(ccb.done)
ccb.done.Fire()
}

func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
Expand All @@ -180,24 +164,22 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
})
}

func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) {
if ccb.cc.curBalancerName != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
s := &ccs.ResolverState
for i := 0; i < len(s.Addresses); {
if s.Addresses[i].Type == resolver.GRPCLB {
copy(s.Addresses[i:], s.Addresses[i+1:])
s.Addresses = s.Addresses[:len(s.Addresses)-1]
continue
}
i++
}
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
return ub.UpdateClientConnState(*ccs)
}
select {
case <-ccb.ccUpdateCh:
default:
ccb.balancer.HandleResolvedAddrs(ccs.ResolverState.Addresses, nil)
return nil
}

func (ccb *ccBalancerWrapper) resolverError(err error) {
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ccb.balancerMu.Lock()
ub.ResolverError(err)
ccb.balancerMu.Unlock()
}
ccb.ccUpdateCh <- ccs
}

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
Expand Down
92 changes: 92 additions & 0 deletions balancer_conn_wrappers_test.go
@@ -0,0 +1,92 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package grpc

import (
"fmt"
"testing"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
)

var _ balancer.V2Balancer = &funcBalancer{}

type funcBalancer struct {
updateClientConnState func(s balancer.ClientConnState) error
}

func (*funcBalancer) HandleSubConnStateChange(balancer.SubConn, connectivity.State) {
panic("unimplemented") // v1 API
}
func (*funcBalancer) HandleResolvedAddrs([]resolver.Address, error) {
panic("unimplemented") // v1 API
}
func (b *funcBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
return b.updateClientConnState(s)
}
func (*funcBalancer) ResolverError(error) {
panic("unimplemented") // resolver never reports error
}
func (*funcBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
panic("unimplemented") // we never have sub-conns
}
func (*funcBalancer) Close() {}

type funcBalancerBuilder struct {
name string
instance *funcBalancer
}

func (b *funcBalancerBuilder) Build(balancer.ClientConn, balancer.BuildOptions) balancer.Balancer {
return b.instance
}
func (b *funcBalancerBuilder) Name() string { return b.name }

// TestBalancerErrorResolverPolling injects balancer errors and verifies
// ResolveNow is called on the resolver with the appropriate backoff strategy
// being consulted between ResolveNow calls.
func (s) TestBalancerErrorResolverPolling(t *testing.T) {
// The test balancer will return ErrBadResolverState iff the
// ClientConnState contains no addresses.
fb := &funcBalancer{
updateClientConnState: func(s balancer.ClientConnState) error {
if len(s.ResolverState.Addresses) == 0 {
return balancer.ErrBadResolverState
}
return nil
},
}
const balName = "BalancerErrorResolverPolling"
balancer.Register(&funcBalancerBuilder{name: balName, instance: fb})

testResolverErrorPolling(t,
func(r *manual.Resolver) {
// No addresses so the balancer will fail.
r.CC.UpdateState(resolver.State{})
}, func(r *manual.Resolver) {
// UpdateState will block if ResolveNow is being called (which blocks on
// rn), so call it in a goroutine. Include some address so the balancer
// will be happy.
go r.CC.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "x"}}})
},
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, balName)))
}