Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ringhash: handle config updates properly #5557

Merged
merged 5 commits into from Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 17 additions & 19 deletions xds/internal/balancer/ringhash/ring.go
Expand Up @@ -19,7 +19,6 @@
package ringhash

import (
"fmt"
"math"
"sort"
"strconv"
Expand Down Expand Up @@ -64,12 +63,12 @@ type ringEntry struct {
//
// To pick from a ring, a binary search will be done for the given target hash,
// and first item with hash >= given hash will be returned.
func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*ring, error) {
//
// Must be called with a non-empty subConns map.
func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) *ring {
// https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114
normalizedWeights, minWeight, err := normalizeWeights(subConns)
if err != nil {
return nil, err
}
normalizedWeights, minWeight := normalizeWeights(subConns)

// Normalized weights for {3,3,4} is {0.3,0.3,0.4}.

// Scale up the size of the ring such that the least-weighted host gets a
Expand Down Expand Up @@ -106,30 +105,29 @@ func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*r
for i, ii := range items {
ii.idx = i
}
return &ring{items: items}, nil
return &ring{items: items}
}

// normalizeWeights divides all the weights by the sum, so that the total weight
// is 1.
func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64, error) {
//
// Must be called with a non-empty subConns map.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing "non-empty" encapsualtes nil and len(0)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right.

func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64) {
var weightSum float64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of keeping this uint32, and instead of having another var to convert it like in master, you convert in the denominator float64(weightsum) to avoid cast on line 119. I don't care/don't feel strongly about this but was just proposing a suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

keys := subConns.Keys()
if len(keys) == 0 {
return nil, 0, fmt.Errorf("number of subconns is 0")
}
var weightSum uint32
for _, a := range keys {
weightSum += getWeightAttribute(a)
}
if weightSum == 0 {
return nil, 0, fmt.Errorf("total weight of all subconns is 0")
weightSum += float64(getWeightAttribute(a))
}
weightSumF := float64(weightSum)
ret := make([]subConnWithWeight, 0, len(keys))
min := float64(1.0)
for _, a := range keys {
v, _ := subConns.Get(a)
scInfo := v.(*subConn)
nw := float64(getWeightAttribute(a)) / weightSumF
// getWeightAttribute() returns 1 if the weight attribute is not found
// on the address. And since this function is guaranteed to be called
// with a non-empty subConns map, weightSum is guaranteed to be
// non-zero. So, we need not worry about divide by zero error here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, super super minor nit: "need not worry about a divide"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

nw := float64(getWeightAttribute(a)) / weightSum
ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw})
if nw < min {
min = nw
Expand All @@ -142,7 +140,7 @@ func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float
// where an address is added and then removed, the RPCs will still pick the
// same old SubConn.
sort.Slice(ret, func(i, j int) bool { return ret[i].sc.addr < ret[j].sc.addr })
return ret, min, nil
return ret, min
}

// pick does a binary search. It returns the item with smallest index i that
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/ringhash/ring_test.go
Expand Up @@ -52,7 +52,7 @@ func (s) TestRingNew(t *testing.T) {
for _, min := range []uint64{3, 4, 6, 8} {
for _, max := range []uint64{20, 8} {
t.Run(fmt.Sprintf("size-min-%v-max-%v", min, max), func(t *testing.T) {
r, _ := newRing(testSubConnMap, min, max)
r := newRing(testSubConnMap, min, max)
totalCount := len(r.items)
if totalCount < int(min) || totalCount > int(max) {
t.Fatalf("unexpect size %v, want min %v, max %v", totalCount, min, max)
Expand Down Expand Up @@ -82,7 +82,7 @@ func equalApproximately(x, y float64) bool {
}

func (s) TestRingPick(t *testing.T) {
r, _ := newRing(testSubConnMap, 10, 20)
r := newRing(testSubConnMap, 10, 20)
for _, h := range []uint64{xxhash.Sum64String("1"), xxhash.Sum64String("2"), xxhash.Sum64String("3"), xxhash.Sum64String("4")} {
t.Run(fmt.Sprintf("picking-hash-%v", h), func(t *testing.T) {
e := r.pick(h)
Expand All @@ -100,7 +100,7 @@ func (s) TestRingPick(t *testing.T) {
}

func (s) TestRingNext(t *testing.T) {
r, _ := newRing(testSubConnMap, 10, 20)
r := newRing(testSubConnMap, 10, 20)

for _, e := range r.items {
ne := r.next(e)
Expand Down
22 changes: 8 additions & 14 deletions xds/internal/balancer/ringhash/ringhash.go
Expand Up @@ -273,28 +273,22 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err
return balancer.ErrBadResolverState
}

// If addresses were updated, whether it resulted in SubConn
// creation/deletion, or just weight update, we need to regenerate the ring
// and send a new picker.
regenerateRing := b.updateAddresses(s.ResolverState.Addresses)

// If the ring configuration has changed, we need to regenerate the ring and
// send a new picker.
var regenerateRing bool
if b.config == nil || b.config.MinRingSize != newConfig.MinRingSize || b.config.MaxRingSize != newConfig.MaxRingSize {
regenerateRing = true
}
b.config = newConfig

// If addresses were updated, whether it resulted in SubConn
// creation/deletion, or just weight update, we need to regenerate the ring
// and send a new picker.
if b.updateAddresses(s.ResolverState.Addresses) {
regenerateRing = true
}

if regenerateRing {
ring, err := newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize)
if err != nil {
b.ResolverError(fmt.Errorf("ringhash failed to make a new ring: %v", err))
return balancer.ErrBadResolverState
}
b.ring = ring
// Ring creation is guaranteed to not fail because we call newRing()
// with a non-empty subConns map.
b.ring = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize)
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}
Expand Down
25 changes: 14 additions & 11 deletions xds/internal/balancer/ringhash/ringhash_test.go
Expand Up @@ -111,31 +111,34 @@ func Test(t *testing.T) {
// TestUpdateClientConnState_NewRingSize tests the scenario where the ringhash
// LB policy receives new configuration which specifies new values for the ring
// min and max sizes. The test verifies that a new ring is created and a new
// picker is pushed on the channel.
// picker is sent to the ClientConn.
func (s) TestUpdateClientConnState_NewRingSize(t *testing.T) {
origMinRingSize, origMaxRingSize := 1, 10 // Configured from `testConfig` in `setupTest`
newMinRingSize, newMaxRingSize := 20, 100

addrs := []resolver.Address{{Addr: testBackendAddrStrs[0]}}
cc, b, p0 := setupTest(t, addrs)
ring0 := p0.(*picker).ring
if ringSize := len(ring0.items); ringSize < 1 || ringSize > 10 {
t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, 1, 10)
cc, b, p1 := setupTest(t, addrs)
ring1 := p1.(*picker).ring
if ringSize := len(ring1.items); ringSize < origMinRingSize || ringSize > origMaxRingSize {
t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, origMinRingSize, origMaxRingSize)
}

if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: &LBConfig{MinRingSize: 20, MaxRingSize: 100},
BalancerConfig: &LBConfig{MinRingSize: uint64(newMinRingSize), MaxRingSize: uint64(newMaxRingSize)},
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}

var ring1 *ring
var ring2 *ring
select {
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout when waiting for a picker update after a configuration update")
case p1 := <-cc.NewPickerCh:
ring1 = p1.(*picker).ring
case p2 := <-cc.NewPickerCh:
ring2 = p2.(*picker).ring
}
if ringSize := len(ring1.items); ringSize < 20 || ringSize > 100 {
t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, 20, 100)
if ringSize := len(ring2.items); ringSize < newMinRingSize || ringSize > newMaxRingSize {
t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, newMinRingSize, newMaxRingSize)
}
}

Expand Down