diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index f1bb6dd3073..dd839796397 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -43,7 +43,7 @@ type ccBalancerWrapper struct { cc *ClientConn balancerMu sync.Mutex // synchronizes calls to the balancer balancer balancer.Balancer - scBuffer *buffer.Unbounded + updateCh *buffer.Unbounded closed *grpcsync.Event done *grpcsync.Event @@ -54,7 +54,7 @@ type ccBalancerWrapper struct { func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper { ccb := &ccBalancerWrapper{ cc: cc, - scBuffer: buffer.NewUnbounded(), + updateCh: buffer.NewUnbounded(), closed: grpcsync.NewEvent(), done: grpcsync.NewEvent(), subConns: make(map[*acBalancerWrapper]struct{}), @@ -69,15 +69,26 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui func (ccb *ccBalancerWrapper) watcher() { for { select { - case t := <-ccb.scBuffer.Get(): - ccb.scBuffer.Load() + case t := <-ccb.updateCh.Get(): + ccb.updateCh.Load() if ccb.closed.HasFired() { break } - ccb.balancerMu.Lock() - su := t.(*scStateUpdate) - ccb.balancer.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err}) - ccb.balancerMu.Unlock() + switch u := t.(type) { + case *scStateUpdate: + ccb.balancerMu.Lock() + ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err}) + ccb.balancerMu.Unlock() + case *acBalancerWrapper: + ccb.mu.Lock() + if ccb.subConns != nil { + delete(ccb.subConns, u) + ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain) + } + ccb.mu.Unlock() + default: + logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t) + } case <-ccb.closed.Done(): } @@ -118,7 +129,7 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co if sc == nil { return } - ccb.scBuffer.Put(&scStateUpdate{ + ccb.updateCh.Put(&scStateUpdate{ sc: sc, state: s, err: err, @@ -159,17 +170,10 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer } func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { - acbw, ok := sc.(*acBalancerWrapper) - if !ok { - return - } - ccb.mu.Lock() - defer ccb.mu.Unlock() - if ccb.subConns == nil { - return - } - delete(ccb.subConns, acbw) - ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) + // The RemoveSubConn() is handled in the run() goroutine, to avoid deadlock + // during switchBalancer() if the old balancer calls RemoveSubConn() in its + // Close(). + ccb.updateCh.Put(sc) } func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { diff --git a/balancer_switching_test.go b/balancer_switching_test.go index 2c6ed576620..e9fee87d8f8 100644 --- a/balancer_switching_test.go +++ b/balancer_switching_test.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/serviceconfig" @@ -531,6 +532,51 @@ func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) { } } +const inlineRemoveSubConnBalancerName = "test-inline-remove-subconn-balancer" + +func init() { + stub.Register(inlineRemoveSubConnBalancerName, stub.BalancerFuncs{ + Close: func(data *stub.BalancerData) { + data.ClientConn.RemoveSubConn(&acBalancerWrapper{}) + }, + }) +} + +// Test that when switching to balancers, the old balancer calls RemoveSubConn +// in Close. +// +// This test is to make sure this close doesn't cause a deadlock. +func (s) TestSwitchBalancerOldRemoveSubConn(t *testing.T) { + r := manual.NewBuilderWithScheme("whatever") + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r)) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer cc.Close() + cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, fmt.Sprintf(`{"loadBalancingPolicy": "%v"}`, inlineRemoveSubConnBalancerName))}, nil) + // This service config update will switch balancer from + // "test-inline-remove-subconn-balancer" to "pick_first". The test balancer + // will be closed, which will call cc.RemoveSubConn() inline (this + // RemoveSubConn is not required by the API, but some balancers might do + // it). + // + // This is to make sure the cc.RemoveSubConn() from Close() doesn't cause a + // deadlock (e.g. trying to grab a mutex while it's already locked). + // + // Do it in a goroutine so this test will fail with a helpful message + // (though the goroutine will still leak). + done := make(chan struct{}) + go func() { + cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`)}, nil) + close(done) + }() + select { + case <-time.After(defaultTestTimeout): + t.Fatalf("timeout waiting for updateResolverState to finish") + case <-done: + } +} + func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult { scpr := r.CC.ParseServiceConfig(s) if scpr.Err != nil {