Skip to content

Commit

Permalink
client: handle RemoveSubConn in goroutine to avoid deadlock (#4504)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Jun 3, 2021
1 parent 174b1c2 commit 0956b12
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 20 deletions.
44 changes: 24 additions & 20 deletions balancer_conn_wrappers.go
Expand Up @@ -43,7 +43,7 @@ type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
scBuffer *buffer.Unbounded
updateCh *buffer.Unbounded
closed *grpcsync.Event
done *grpcsync.Event

Expand All @@ -54,7 +54,7 @@ type ccBalancerWrapper struct {
func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
ccb := &ccBalancerWrapper{
cc: cc,
scBuffer: buffer.NewUnbounded(),
updateCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
subConns: make(map[*acBalancerWrapper]struct{}),
Expand All @@ -69,15 +69,26 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
func (ccb *ccBalancerWrapper) watcher() {
for {
select {
case t := <-ccb.scBuffer.Get():
ccb.scBuffer.Load()
case t := <-ccb.updateCh.Get():
ccb.updateCh.Load()
if ccb.closed.HasFired() {
break
}
ccb.balancerMu.Lock()
su := t.(*scStateUpdate)
ccb.balancer.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err})
ccb.balancerMu.Unlock()
switch u := t.(type) {
case *scStateUpdate:
ccb.balancerMu.Lock()
ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err})
ccb.balancerMu.Unlock()
case *acBalancerWrapper:
ccb.mu.Lock()
if ccb.subConns != nil {
delete(ccb.subConns, u)
ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)
}
ccb.mu.Unlock()
default:
logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
}
case <-ccb.closed.Done():
}

Expand Down Expand Up @@ -118,7 +129,7 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
if sc == nil {
return
}
ccb.scBuffer.Put(&scStateUpdate{
ccb.updateCh.Put(&scStateUpdate{
sc: sc,
state: s,
err: err,
Expand Down Expand Up @@ -159,17 +170,10 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
}

func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
}
ccb.mu.Lock()
defer ccb.mu.Unlock()
if ccb.subConns == nil {
return
}
delete(ccb.subConns, acbw)
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
// The RemoveSubConn() is handled in the run() goroutine, to avoid deadlock
// during switchBalancer() if the old balancer calls RemoveSubConn() in its
// Close().
ccb.updateCh.Put(sc)
}

func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
Expand Down
46 changes: 46 additions & 0 deletions balancer_switching_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
Expand Down Expand Up @@ -531,6 +532,51 @@ func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
}
}

const inlineRemoveSubConnBalancerName = "test-inline-remove-subconn-balancer"

func init() {
stub.Register(inlineRemoveSubConnBalancerName, stub.BalancerFuncs{
Close: func(data *stub.BalancerData) {
data.ClientConn.RemoveSubConn(&acBalancerWrapper{})
},
})
}

// Test that when switching to balancers, the old balancer calls RemoveSubConn
// in Close.
//
// This test is to make sure this close doesn't cause a deadlock.
func (s) TestSwitchBalancerOldRemoveSubConn(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, fmt.Sprintf(`{"loadBalancingPolicy": "%v"}`, inlineRemoveSubConnBalancerName))}, nil)
// This service config update will switch balancer from
// "test-inline-remove-subconn-balancer" to "pick_first". The test balancer
// will be closed, which will call cc.RemoveSubConn() inline (this
// RemoveSubConn is not required by the API, but some balancers might do
// it).
//
// This is to make sure the cc.RemoveSubConn() from Close() doesn't cause a
// deadlock (e.g. trying to grab a mutex while it's already locked).
//
// Do it in a goroutine so this test will fail with a helpful message
// (though the goroutine will still leak).
done := make(chan struct{})
go func() {
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`)}, nil)
close(done)
}()
select {
case <-time.After(defaultTestTimeout):
t.Fatalf("timeout waiting for updateResolverState to finish")
case <-done:
}
}

func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
scpr := r.CC.ParseServiceConfig(s)
if scpr.Err != nil {
Expand Down

0 comments on commit 0956b12

Please sign in to comment.