Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Aug 16, 2019
1 parent 5cb2152 commit ef03f65
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 41 deletions.
18 changes: 1 addition & 17 deletions balancer_conn_wrappers.go
Expand Up @@ -87,7 +87,7 @@ func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
stateChangeQueue *scStateUpdateBuffer
done *grpcsync.Event
Expand Down Expand Up @@ -166,22 +166,6 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
}

func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
cbn := ccb.cc.curBalancerName
ccb.cc.mu.Unlock()
defer ccb.cc.mu.Lock()
if cbn != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
s := &ccs.ResolverState
for i := 0; i < len(s.Addresses); {
if s.Addresses[i].Type == resolver.GRPCLB {
copy(s.Addresses[i:], s.Addresses[i+1:])
s.Addresses = s.Addresses[:len(s.Addresses)-1]
continue
}
i++
}
}

ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
Expand Down
24 changes: 21 additions & 3 deletions clientconn.go
Expand Up @@ -556,23 +556,26 @@ func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {

func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
cc.mu.Lock()
defer cc.mu.Unlock()
// Check if the ClientConn is already closed. Some fields (e.g.
// balancerWrapper) are set to nil when closing the ClientConn, and could
// cause nil pointer panic if we don't have this check.
if cc.conns == nil {
cc.mu.Unlock()
return nil
}

if err != nil {
// May need to apply the initial service config
// May need to apply the initial service config in case the resolver
// doesn't support service configs, or doesn't provide a service config
// with the new addresses.
cc.maybeApplyDefaultServiceConfig(nil)

if cc.balancerWrapper != nil {
cc.balancerWrapper.resolverError(err)
}

// No addresses are valid with err set; return early.
cc.mu.Unlock()
return balancer.ErrBadResolverState
}

Expand All @@ -594,7 +597,22 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
}
uccsErr := cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})

cbn := cc.curBalancerName
bw := cc.balancerWrapper
cc.mu.Unlock()
if cbn != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
for i := 0; i < len(s.Addresses); {
if s.Addresses[i].Type == resolver.GRPCLB {
copy(s.Addresses[i:], s.Addresses[i+1:])
s.Addresses = s.Addresses[:len(s.Addresses)-1]
continue
}
i++
}
}
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
if ret == nil {
ret = uccsErr // prefer ErrBadResolver state since any other error is
// currently meaningless to the caller.
Expand Down
51 changes: 30 additions & 21 deletions resolver_conn_wrapper.go
Expand Up @@ -22,13 +22,13 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
Expand All @@ -40,7 +40,7 @@ type ccResolverWrapper struct {
resolver resolver.Resolver
addrCh chan []resolver.Address
scCh chan string
done uint32 // accessed atomically; set to 1 when closed.
done *grpcsync.Event
curState resolver.State

mu sync.Mutex // protects polling
Expand Down Expand Up @@ -91,6 +91,7 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
cc: cc,
addrCh: make(chan []resolver.Address, 1),
scCh: make(chan string, 1),
done: grpcsync.NewEvent(),
}

var err error
Expand All @@ -102,21 +103,28 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
}

func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) {
ccr.resolver.ResolveNow(o)
ccr.mu.Lock()
if !ccr.done.HasFired() {
ccr.resolver.ResolveNow(o)
}
ccr.mu.Unlock()
}

func (ccr *ccResolverWrapper) close() {
ccr.mu.Lock()
ccr.resolver.Close()
atomic.StoreUint32(&ccr.done, 1)
ccr.done.Fire()
ccr.mu.Unlock()
}

var resolverBackoff = backoff.Exponential{MaxDelay: 2 * time.Minute}

// poll begins or ends asynchronous polling of the resolver.
func (ccr *ccResolverWrapper) poll(run bool) {
// poll begins or ends asynchronous polling of the resolver based on whether
// err is ErrBadResolverState.
func (ccr *ccResolverWrapper) poll(err error) {
ccr.mu.Lock()
defer ccr.mu.Unlock()
if !run {
if err != balancer.ErrBadResolverState {
// stop polling
if ccr.polling != nil {
close(ccr.polling)
Expand All @@ -131,37 +139,38 @@ func (ccr *ccResolverWrapper) poll(run bool) {
p := make(chan struct{})
ccr.polling = p
go func() {
for i := 0; true; i++ {
for i := 0; ; i++ {
ccr.resolveNow(resolver.ResolveNowOption{})
t := time.NewTimer(resolverBackoff.Backoff(i))
select {
case <-p:
t.Stop()
return
case <-ccr.done.Done():
// Resolver has been closed.
t.Stop()
return
case <-t.C:
// Timer expired; re-resolve.
}
}
}()
}

func (ccr *ccResolverWrapper) isDone() bool {
return atomic.LoadUint32(&ccr.done) == 1
}

func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
if ccr.isDone() {
if ccr.done.HasFired() {
return
}
grpclog.Infof("ccResolverWrapper: sending update to cc: %v", s)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(s)
}
ccr.curState = s
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil) == balancer.ErrBadResolverState)
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
}

func (ccr *ccResolverWrapper) ReportError(err error) {
if ccr.isDone() {
if ccr.done.HasFired() {
return
}
grpclog.Warningf("ccResolverWrapper: reporting error to cc: %v", err)
Expand All @@ -171,26 +180,26 @@ func (ccr *ccResolverWrapper) ReportError(err error) {
Severity: channelz.CtWarning,
})
}
ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err) == balancer.ErrBadResolverState)
ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
}

// NewAddress is called by the resolver implementation to send addresses to gRPC.
func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
if ccr.isDone() {
if ccr.done.HasFired() {
return
}
grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfigGetter: ccr.curState.ServiceConfigGetter})
}
ccr.curState.Addresses = addrs
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil) == balancer.ErrBadResolverState)
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
}

// NewServiceConfig is called by the resolver implementation to send service
// configs to gRPC.
func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
if ccr.isDone() {
if ccr.done.HasFired() {
return
}
grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
Expand All @@ -203,14 +212,14 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
Severity: channelz.CtWarning,
})
}
ccr.poll(true)
ccr.poll(balancer.ErrBadResolverState)
return
}
if channelz.IsOn() {
ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfigGetter: scg})
}
ccr.curState.ServiceConfigGetter = scg
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil) == balancer.ErrBadResolverState)
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
}

func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.Getter {
Expand Down

0 comments on commit ef03f65

Please sign in to comment.