Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ringhash: handle config updates properly #5557

Merged
merged 5 commits into from Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 16 additions & 18 deletions xds/internal/balancer/ringhash/ring.go
Expand Up @@ -19,7 +19,6 @@
package ringhash

import (
"fmt"
"math"
"sort"
"strconv"
Expand Down Expand Up @@ -64,12 +63,12 @@ type ringEntry struct {
//
// To pick from a ring, a binary search will be done for the given target hash,
// and first item with hash >= given hash will be returned.
func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*ring, error) {
//
// Must be called with a non-empty subConns map.
func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) *ring {
// https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114
normalizedWeights, minWeight, err := normalizeWeights(subConns)
if err != nil {
return nil, err
}
normalizedWeights, minWeight := normalizeWeights(subConns)

// Normalized weights for {3,3,4} is {0.3,0.3,0.4}.

// Scale up the size of the ring such that the least-weighted host gets a
Expand Down Expand Up @@ -106,30 +105,29 @@ func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*r
for i, ii := range items {
ii.idx = i
}
return &ring{items: items}, nil
return &ring{items: items}
}

// normalizeWeights divides all the weights by the sum, so that the total weight
// is 1.
func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64, error) {
keys := subConns.Keys()
if len(keys) == 0 {
return nil, 0, fmt.Errorf("number of subconns is 0")
}
//
// Must be called with a non-empty subConns map.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing "non-empty" encapsualtes nil and len(0)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right.

func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64) {
var weightSum uint32
keys := subConns.Keys()
for _, a := range keys {
weightSum += getWeightAttribute(a)
}
if weightSum == 0 {
return nil, 0, fmt.Errorf("total weight of all subconns is 0")
}
weightSumF := float64(weightSum)
ret := make([]subConnWithWeight, 0, len(keys))
min := float64(1.0)
for _, a := range keys {
v, _ := subConns.Get(a)
scInfo := v.(*subConn)
nw := float64(getWeightAttribute(a)) / weightSumF
// getWeightAttribute() returns 1 if the weight attribute is not found
// on the address. And since this function is guaranteed to be called
// with a non-empty subConns map, weightSum is guaranteed to be
// non-zero. So, we need not worry about divide a by zero error here.
nw := float64(getWeightAttribute(a)) / float64(weightSum)
ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw})
if nw < min {
min = nw
Expand All @@ -142,7 +140,7 @@ func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float
// where an address is added and then removed, the RPCs will still pick the
// same old SubConn.
sort.Slice(ret, func(i, j int) bool { return ret[i].sc.addr < ret[j].sc.addr })
return ret, min, nil
return ret, min
}

// pick does a binary search. It returns the item with smallest index i that
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/ringhash/ring_test.go
Expand Up @@ -52,7 +52,7 @@ func (s) TestRingNew(t *testing.T) {
for _, min := range []uint64{3, 4, 6, 8} {
for _, max := range []uint64{20, 8} {
t.Run(fmt.Sprintf("size-min-%v-max-%v", min, max), func(t *testing.T) {
r, _ := newRing(testSubConnMap, min, max)
r := newRing(testSubConnMap, min, max)
totalCount := len(r.items)
if totalCount < int(min) || totalCount > int(max) {
t.Fatalf("unexpect size %v, want min %v, max %v", totalCount, min, max)
Expand Down Expand Up @@ -82,7 +82,7 @@ func equalApproximately(x, y float64) bool {
}

func (s) TestRingPick(t *testing.T) {
r, _ := newRing(testSubConnMap, 10, 20)
r := newRing(testSubConnMap, 10, 20)
for _, h := range []uint64{xxhash.Sum64String("1"), xxhash.Sum64String("2"), xxhash.Sum64String("3"), xxhash.Sum64String("4")} {
t.Run(fmt.Sprintf("picking-hash-%v", h), func(t *testing.T) {
e := r.pick(h)
Expand All @@ -100,7 +100,7 @@ func (s) TestRingPick(t *testing.T) {
}

func (s) TestRingNext(t *testing.T) {
r, _ := newRing(testSubConnMap, 10, 20)
r := newRing(testSubConnMap, 10, 20)

for _, e := range r.items {
ne := r.next(e)
Expand Down
44 changes: 24 additions & 20 deletions xds/internal/balancer/ringhash/ringhash.go
Expand Up @@ -259,29 +259,22 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {

func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
if b.config == nil {
newConfig, ok := s.BalancerConfig.(*LBConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}
b.config = newConfig
newConfig, ok := s.BalancerConfig.(*LBConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}

// Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil
if b.updateAddresses(s.ResolverState.Addresses) {
// If addresses were updated, no matter whether it resulted in SubConn
// creation/deletion, or just weight update, we will need to regenerate
// the ring.
var err error
b.ring, err = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize)
if err != nil {
b.ResolverError(fmt.Errorf("ringhash failed to make a new ring: %v", err))
return balancer.ErrBadResolverState
}
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
// If addresses were updated, whether it resulted in SubConn
// creation/deletion, or just weight update, we need to regenerate the ring
// and send a new picker.
regenerateRing := b.updateAddresses(s.ResolverState.Addresses)

// If the ring configuration has changed, we need to regenerate the ring and
// send a new picker.
if b.config == nil || b.config.MinRingSize != newConfig.MinRingSize || b.config.MaxRingSize != newConfig.MaxRingSize {
regenerateRing = true
}
b.config = newConfig

// If resolver state contains no addresses, return an error so ClientConn
// will trigger re-resolve. Also records this as an resolver error, so when
Expand All @@ -291,6 +284,17 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}

if regenerateRing {
// Ring creation is guaranteed to not fail because we call newRing()
// with a non-empty subConns map.
b.ring = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize)
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}

// Successful resolution; clear resolver error and return nil.
b.resolverErr = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I don't know if this breaks it, but this doesn't clear it if hits line 295, whereas in master it did. Although it feels appropriate here, but please triage if this breaks anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one looks totally fine to me.

return nil
}

Expand Down
42 changes: 38 additions & 4 deletions xds/internal/balancer/ringhash/ringhash_test.go
Expand Up @@ -108,6 +108,40 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestUpdateClientConnState_NewRingSize tests the scenario where the ringhash
// LB policy receives new configuration which specifies new values for the ring
// min and max sizes. The test verifies that a new ring is created and a new
// picker is sent to the ClientConn.
func (s) TestUpdateClientConnState_NewRingSize(t *testing.T) {
origMinRingSize, origMaxRingSize := 1, 10 // Configured from `testConfig` in `setupTest`
newMinRingSize, newMaxRingSize := 20, 100

addrs := []resolver.Address{{Addr: testBackendAddrStrs[0]}}
cc, b, p1 := setupTest(t, addrs)
ring1 := p1.(*picker).ring
if ringSize := len(ring1.items); ringSize < origMinRingSize || ringSize > origMaxRingSize {
t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, origMinRingSize, origMaxRingSize)
}

if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: &LBConfig{MinRingSize: uint64(newMinRingSize), MaxRingSize: uint64(newMaxRingSize)},
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}

var ring2 *ring
select {
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout when waiting for a picker update after a configuration update")
case p2 := <-cc.NewPickerCh:
ring2 = p2.(*picker).ring
}
if ringSize := len(ring2.items); ringSize < newMinRingSize || ringSize > newMaxRingSize {
t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, newMinRingSize, newMaxRingSize)
}
}

func (s) TestOneSubConn(t *testing.T) {
wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0]}
cc, b, p0 := setupTest(t, []resolver.Address{wantAddr1})
Expand Down Expand Up @@ -320,7 +354,7 @@ func (s) TestAddrWeightChange(t *testing.T) {

if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: wantAddrs},
BalancerConfig: nil,
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
Expand All @@ -336,7 +370,7 @@ func (s) TestAddrWeightChange(t *testing.T) {
{Addr: testBackendAddrStrs[0]},
{Addr: testBackendAddrStrs[1]},
}},
BalancerConfig: nil,
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
Expand All @@ -359,7 +393,7 @@ func (s) TestAddrWeightChange(t *testing.T) {
resolver.Address{Addr: testBackendAddrStrs[1]},
weightedroundrobin.AddrInfo{Weight: 2}),
}},
BalancerConfig: nil,
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
Expand Down Expand Up @@ -505,7 +539,7 @@ func (s) TestAddrBalancerAttributesChange(t *testing.T) {
addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs2},
BalancerConfig: nil,
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
Expand Down