Skip to content

Commit

Permalink
clientconn: do not automatically reconnect addrConns; go idle instead (
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Aug 10, 2021
1 parent 01babab commit 997ce61
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 163 deletions.
17 changes: 12 additions & 5 deletions balancer/balancer.go
Expand Up @@ -353,18 +353,20 @@ var ErrBadResolverState = errors.New("bad resolver state")
//
// It's not thread safe.
type ConnectivityStateEvaluator struct {
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transient failure state.
}

// RecordTransition records state change happening in subConn and based on that
// it evaluates what aggregated state should be.
//
// - If at least one SubConn in Ready, the aggregated state is Ready;
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
// - Else the aggregated state is TransientFailure.
// - Else if at least one SubConn is TransientFailure, the aggregated state is Transient Failure;
// - Else the aggregated state is Idle
//
// Idle and Shutdown are not considered.
// Shutdown is not considered.
func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State {
// Update counters.
for idx, state := range []connectivity.State{oldState, newState} {
Expand All @@ -374,6 +376,8 @@ func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState conne
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
}
}

Expand All @@ -384,5 +388,8 @@ func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState conne
if cse.numConnecting > 0 {
return connectivity.Connecting
}
return connectivity.TransientFailure
if cse.numTransientFailure > 0 {
return connectivity.TransientFailure
}
return connectivity.Idle
}
14 changes: 7 additions & 7 deletions balancer_conn_wrappers.go
Expand Up @@ -239,25 +239,25 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
return
}

ac, err := cc.newAddrConn(addrs, opts)
newAC, err := cc.newAddrConn(addrs, opts)
if err != nil {
channelz.Warningf(logger, acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
return
}
acbw.ac = ac
ac.mu.Lock()
ac.acbw = acbw
ac.mu.Unlock()
acbw.ac = newAC
newAC.mu.Lock()
newAC.acbw = acbw
newAC.mu.Unlock()
if acState != connectivity.Idle {
ac.connect()
go newAC.connect()
}
}
}

func (acbw *acBalancerWrapper) Connect() {
acbw.mu.Lock()
defer acbw.mu.Unlock()
acbw.ac.connect()
go acbw.ac.connect()
}

func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
Expand Down

0 comments on commit 997ce61

Please sign in to comment.