Skip to content

Commit

Permalink
ensure LB policy is closed before closing resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed May 21, 2021
1 parent d6ba5e4 commit 3747ef1
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
12 changes: 8 additions & 4 deletions balancer_conn_wrappers.go
Expand Up @@ -44,6 +44,7 @@ type ccBalancerWrapper struct {
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
scBuffer *buffer.Unbounded
closed *grpcsync.Event
done *grpcsync.Event

mu sync.Mutex
Expand All @@ -54,6 +55,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
ccb := &ccBalancerWrapper{
cc: cc,
scBuffer: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
subConns: make(map[*acBalancerWrapper]struct{}),
}
Expand All @@ -69,17 +71,17 @@ func (ccb *ccBalancerWrapper) watcher() {
select {
case t := <-ccb.scBuffer.Get():
ccb.scBuffer.Load()
if ccb.done.HasFired() {
if ccb.closed.HasFired() {
break
}
ccb.balancerMu.Lock()
su := t.(*scStateUpdate)
ccb.balancer.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err})
ccb.balancerMu.Unlock()
case <-ccb.done.Done():
case <-ccb.closed.Done():
}

if ccb.done.HasFired() {
if ccb.closed.HasFired() {
ccb.balancer.Close()
ccb.mu.Lock()
scs := ccb.subConns
Expand All @@ -89,13 +91,15 @@ func (ccb *ccBalancerWrapper) watcher() {
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})
ccb.done.Fire()
return
}
}
}

func (ccb *ccBalancerWrapper) close() {
ccb.done.Fire()
ccb.closed.Fire()
<-ccb.done.Done()
}

func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
Expand Down
6 changes: 3 additions & 3 deletions clientconn.go
Expand Up @@ -1046,12 +1046,12 @@ func (cc *ClientConn) Close() error {

cc.blockingpicker.close()

if rWrapper != nil {
rWrapper.close()
}
if bWrapper != nil {
bWrapper.close()
}
if rWrapper != nil {
rWrapper.close()
}

for ac := range conns {
ac.tearDown(ErrClientConnClosing)
Expand Down
4 changes: 4 additions & 0 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Expand Up @@ -61,6 +61,7 @@ func (clusterImplBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions)
ClientConn: cc,
bOpts: bOpts,
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
loadWrapper: loadstore.NewWrapper(),
pickerUpdateCh: buffer.NewUnbounded(),
requestCountMax: defaultRequestCountMax,
Expand Down Expand Up @@ -110,6 +111,7 @@ type clusterImplBalancer struct {
// synchronized with Close().
mu sync.Mutex
closed *grpcsync.Event
done *grpcsync.Event

bOpts balancer.BuildOptions
logger *grpclog.PrefixLogger
Expand Down Expand Up @@ -329,6 +331,7 @@ func (cib *clusterImplBalancer) Close() {
if newXDSClient != nil {
cib.xdsC.Close()
}
<-cib.done.Done()
cib.logger.Infof("Shutdown")
}

Expand Down Expand Up @@ -376,6 +379,7 @@ type dropConfigs struct {
}

func (cib *clusterImplBalancer) run() {
defer cib.done.Fire()
for {
select {
case update := <-cib.pickerUpdateCh.Get():
Expand Down

0 comments on commit 3747ef1

Please sign in to comment.