Skip to content

Commit

Permalink
balancer: Add UpdateAddresses() to balancer.ClientConn interface
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Feb 18, 2021
1 parent 1b75f71 commit 3c23e8d
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 3 deletions.
10 changes: 10 additions & 0 deletions balancer/balancer.go
Expand Up @@ -101,6 +101,9 @@ type SubConn interface {
// a new connection will be created.
//
// This will trigger a state transition for the SubConn.
//
// Deprecated: This method is now part of the ClientConn interface and will
// eventually be removed from here.
UpdateAddresses([]resolver.Address)
// Connect starts the connecting for this SubConn.
Connect()
Expand Down Expand Up @@ -143,6 +146,13 @@ type ClientConn interface {
// RemoveSubConn removes the SubConn from ClientConn.
// The SubConn will be shutdown.
RemoveSubConn(SubConn)
// UpdateAddresses updates the addresses used in the passed in SubConn.
// gRPC checks if the currently connected address is still in the new list.
// If so, the connection will be kept. Else, the connection will be
// gracefully closed, and a new connection will be created.
//
// This will trigger a state transition for the SubConn.
UpdateAddresses(SubConn, []resolver.Address)

// UpdateState notifies gRPC that the balancer's internal state has
// changed.
Expand Down
2 changes: 1 addition & 1 deletion balancer/base/balancer.go
Expand Up @@ -135,7 +135,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// The SubConn does a reflect.DeepEqual of the new and old
// addresses. So this is a noop if the current address is the same
// as the old one (including attributes).
sc.UpdateAddresses([]resolver.Address{a})
b.cc.UpdateAddresses(sc, []resolver.Address{a})
}
}
for a, sc := range b.subConns {
Expand Down
2 changes: 1 addition & 1 deletion balancer/grpclb/grpclb_remote_balancer.go
Expand Up @@ -140,7 +140,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
break
}
if sc != nil {
sc.UpdateAddresses(backendAddrs)
lb.cc.cc.UpdateAddresses(sc, backendAddrs)
sc.Connect()
return
}
Expand Down
16 changes: 16 additions & 0 deletions balancer_conn_wrappers.go
Expand Up @@ -163,6 +163,22 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}

func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
}

ccb.mu.Lock()
if ccb.subConns == nil {
ccb.mu.Unlock()
return
}
ccb.mu.Unlock()

acbw.UpdateAddresses(addrs)
}

func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
ccb.mu.Lock()
defer ccb.mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pickfirst.go
Expand Up @@ -84,7 +84,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) e
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}})
b.sc.Connect()
} else {
b.sc.UpdateAddresses(cs.ResolverState.Addresses)
b.cc.UpdateAddresses(b.sc, cs.ResolverState.Addresses)
b.sc.Connect()
}
return nil
Expand Down
3 changes: 3 additions & 0 deletions xds/internal/testutils/balancer.go
Expand Up @@ -123,6 +123,9 @@ func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) {
}
}

// UpdateAddresses updates the addresses on the SubConn.
func (tcc *TestClientConn) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {}

// UpdateState updates connectivity state and picker.
func (tcc *TestClientConn) UpdateState(bs balancer.State) {
tcc.logger.Logf("testClientConn: UpdateState(%v)", bs)
Expand Down

0 comments on commit 3c23e8d

Please sign in to comment.