From 217e9aee54c46eb5d5789ea9dc4777037d0d95ac Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 2 Aug 2022 10:57:52 -0700 Subject: [PATCH 1/5] ringhash: handle config updates properly --- xds/internal/balancer/ringhash/ringhash.go | 52 +++++++++++-------- .../balancer/ringhash/ringhash_test.go | 39 ++++++++++++-- 2 files changed, 66 insertions(+), 25 deletions(-) diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index 3e06fc4eb6e..7e8d761e796 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -259,38 +259,48 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) error { b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig)) - if b.config == nil { - newConfig, ok := s.BalancerConfig.(*LBConfig) - if !ok { - return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) - } - b.config = newConfig + newConfig, ok := s.BalancerConfig.(*LBConfig) + if !ok { + return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) } - // Successful resolution; clear resolver error and ensure we return nil. - b.resolverErr = nil + // If resolver state contains no addresses, return an error so ClientConn + // will trigger re-resolve. Also records this as an resolver error, so when + // the overall state turns transient failure, the error message will have + // the zero address information. + if len(s.ResolverState.Addresses) == 0 { + b.ResolverError(errors.New("produced zero addresses")) + return balancer.ErrBadResolverState + } + + // If the ring configuration has changed, we need to regenerate the ring and + // send a new picker. + var regenerateRing bool + if b.config == nil || b.config.MinRingSize != newConfig.MinRingSize || b.config.MaxRingSize != newConfig.MaxRingSize { + regenerateRing = true + } + b.config = newConfig + + // If addresses were updated, whether it resulted in SubConn + // creation/deletion, or just weight update, we need to regenerate the ring + // and send a new picker. if b.updateAddresses(s.ResolverState.Addresses) { - // If addresses were updated, no matter whether it resulted in SubConn - // creation/deletion, or just weight update, we will need to regenerate - // the ring. - var err error - b.ring, err = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize) + regenerateRing = true + } + + if regenerateRing { + ring, err := newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize) if err != nil { b.ResolverError(fmt.Errorf("ringhash failed to make a new ring: %v", err)) return balancer.ErrBadResolverState } + b.ring = ring b.regeneratePicker() b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) } - // If resolver state contains no addresses, return an error so ClientConn - // will trigger re-resolve. Also records this as an resolver error, so when - // the overall state turns transient failure, the error message will have - // the zero address information. - if len(s.ResolverState.Addresses) == 0 { - b.ResolverError(errors.New("produced zero addresses")) - return balancer.ErrBadResolverState - } + // Successful resolution; clear resolver error and return nil. + b.resolverErr = nil return nil } diff --git a/xds/internal/balancer/ringhash/ringhash_test.go b/xds/internal/balancer/ringhash/ringhash_test.go index 02302321ce5..5ac88423079 100644 --- a/xds/internal/balancer/ringhash/ringhash_test.go +++ b/xds/internal/balancer/ringhash/ringhash_test.go @@ -108,6 +108,37 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } +// TestUpdateClientConnState_NewRingSize tests the scenario where the ringhash +// LB policy receives new configuration which specifies new values for the ring +// min and max sizes. The test verifies that a new ring is created and a new +// picker is pushed on the channel. +func (s) TestUpdateClientConnState_NewRingSize(t *testing.T) { + addrs := []resolver.Address{{Addr: testBackendAddrStrs[0]}} + cc, b, p0 := setupTest(t, addrs) + ring0 := p0.(*picker).ring + if ringSize := len(ring0.items); ringSize < 1 || ringSize > 10 { + t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, 1, 10) + } + + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: addrs}, + BalancerConfig: &LBConfig{MinRingSize: 20, MaxRingSize: 100}, + }); err != nil { + t.Fatalf("UpdateClientConnState returned err: %v", err) + } + + var ring1 *ring + select { + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout when waiting for a picker update after a configuration update") + case p1 := <-cc.NewPickerCh: + ring1 = p1.(*picker).ring + } + if ringSize := len(ring1.items); ringSize < 20 || ringSize > 100 { + t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, 20, 100) + } +} + func (s) TestOneSubConn(t *testing.T) { wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0]} cc, b, p0 := setupTest(t, []resolver.Address{wantAddr1}) @@ -320,7 +351,7 @@ func (s) TestAddrWeightChange(t *testing.T) { if err := b.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: wantAddrs}, - BalancerConfig: nil, + BalancerConfig: testConfig, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err) } @@ -336,7 +367,7 @@ func (s) TestAddrWeightChange(t *testing.T) { {Addr: testBackendAddrStrs[0]}, {Addr: testBackendAddrStrs[1]}, }}, - BalancerConfig: nil, + BalancerConfig: testConfig, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err) } @@ -359,7 +390,7 @@ func (s) TestAddrWeightChange(t *testing.T) { resolver.Address{Addr: testBackendAddrStrs[1]}, weightedroundrobin.AddrInfo{Weight: 2}), }}, - BalancerConfig: nil, + BalancerConfig: testConfig, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err) } @@ -505,7 +536,7 @@ func (s) TestAddrBalancerAttributesChange(t *testing.T) { addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})} if err := b.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: addrs2}, - BalancerConfig: nil, + BalancerConfig: testConfig, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err) } From 1814745e215e1d10e2194f8885632dc825fadecf Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 2 Aug 2022 14:37:07 -0700 Subject: [PATCH 2/5] simplify the logic to determine if ring has to be regenerated --- xds/internal/balancer/ringhash/ringhash.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index 7e8d761e796..2d1ffc9725d 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -273,21 +273,18 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err return balancer.ErrBadResolverState } + // If addresses were updated, whether it resulted in SubConn + // creation/deletion, or just weight update, we need to regenerate the ring + // and send a new picker. + regenerateRing := b.updateAddresses(s.ResolverState.Addresses) + // If the ring configuration has changed, we need to regenerate the ring and // send a new picker. - var regenerateRing bool if b.config == nil || b.config.MinRingSize != newConfig.MinRingSize || b.config.MaxRingSize != newConfig.MaxRingSize { regenerateRing = true } b.config = newConfig - // If addresses were updated, whether it resulted in SubConn - // creation/deletion, or just weight update, we need to regenerate the ring - // and send a new picker. - if b.updateAddresses(s.ResolverState.Addresses) { - regenerateRing = true - } - if regenerateRing { ring, err := newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize) if err != nil { From 1a9db40d7a5a91dc1b418b6cca612f7a5f09c652 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 2 Aug 2022 14:42:42 -0700 Subject: [PATCH 3/5] rename vars/consts --- .../balancer/ringhash/ringhash_test.go | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/xds/internal/balancer/ringhash/ringhash_test.go b/xds/internal/balancer/ringhash/ringhash_test.go index 5ac88423079..e5b10556e98 100644 --- a/xds/internal/balancer/ringhash/ringhash_test.go +++ b/xds/internal/balancer/ringhash/ringhash_test.go @@ -111,31 +111,34 @@ func Test(t *testing.T) { // TestUpdateClientConnState_NewRingSize tests the scenario where the ringhash // LB policy receives new configuration which specifies new values for the ring // min and max sizes. The test verifies that a new ring is created and a new -// picker is pushed on the channel. +// picker is sent to the ClientConn. func (s) TestUpdateClientConnState_NewRingSize(t *testing.T) { + origMinRingSize, origMaxRingSize := 1, 10 // Configured from `testConfig` in `setupTest` + newMinRingSize, newMaxRingSize := 20, 100 + addrs := []resolver.Address{{Addr: testBackendAddrStrs[0]}} - cc, b, p0 := setupTest(t, addrs) - ring0 := p0.(*picker).ring - if ringSize := len(ring0.items); ringSize < 1 || ringSize > 10 { - t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, 1, 10) + cc, b, p1 := setupTest(t, addrs) + ring1 := p1.(*picker).ring + if ringSize := len(ring1.items); ringSize < origMinRingSize || ringSize > origMaxRingSize { + t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, origMinRingSize, origMaxRingSize) } if err := b.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{Addresses: addrs}, - BalancerConfig: &LBConfig{MinRingSize: 20, MaxRingSize: 100}, + BalancerConfig: &LBConfig{MinRingSize: uint64(newMinRingSize), MaxRingSize: uint64(newMaxRingSize)}, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err) } - var ring1 *ring + var ring2 *ring select { case <-time.After(defaultTestTimeout): t.Fatal("Timeout when waiting for a picker update after a configuration update") - case p1 := <-cc.NewPickerCh: - ring1 = p1.(*picker).ring + case p2 := <-cc.NewPickerCh: + ring2 = p2.(*picker).ring } - if ringSize := len(ring1.items); ringSize < 20 || ringSize > 100 { - t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, 20, 100) + if ringSize := len(ring2.items); ringSize < newMinRingSize || ringSize > newMaxRingSize { + t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, newMinRingSize, newMaxRingSize) } } From fbed0581af507f9a891f04e2201d2eca1acfb8c0 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 2 Aug 2022 15:56:20 -0700 Subject: [PATCH 4/5] ring creation should never fail --- xds/internal/balancer/ringhash/ring.go | 36 ++++++++++----------- xds/internal/balancer/ringhash/ring_test.go | 6 ++-- xds/internal/balancer/ringhash/ringhash.go | 9 ++---- 3 files changed, 23 insertions(+), 28 deletions(-) diff --git a/xds/internal/balancer/ringhash/ring.go b/xds/internal/balancer/ringhash/ring.go index 5e8d881006e..7536017629c 100644 --- a/xds/internal/balancer/ringhash/ring.go +++ b/xds/internal/balancer/ringhash/ring.go @@ -19,7 +19,6 @@ package ringhash import ( - "fmt" "math" "sort" "strconv" @@ -64,12 +63,12 @@ type ringEntry struct { // // To pick from a ring, a binary search will be done for the given target hash, // and first item with hash >= given hash will be returned. -func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*ring, error) { +// +// Must be called with a non-empty subConns map. +func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) *ring { // https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114 - normalizedWeights, minWeight, err := normalizeWeights(subConns) - if err != nil { - return nil, err - } + normalizedWeights, minWeight := normalizeWeights(subConns) + // Normalized weights for {3,3,4} is {0.3,0.3,0.4}. // Scale up the size of the ring such that the least-weighted host gets a @@ -106,30 +105,29 @@ func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*r for i, ii := range items { ii.idx = i } - return &ring{items: items}, nil + return &ring{items: items} } // normalizeWeights divides all the weights by the sum, so that the total weight // is 1. -func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64, error) { +// +// Must be called with a non-empty subConns map. +func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64) { + var weightSum float64 keys := subConns.Keys() - if len(keys) == 0 { - return nil, 0, fmt.Errorf("number of subconns is 0") - } - var weightSum uint32 for _, a := range keys { - weightSum += getWeightAttribute(a) - } - if weightSum == 0 { - return nil, 0, fmt.Errorf("total weight of all subconns is 0") + weightSum += float64(getWeightAttribute(a)) } - weightSumF := float64(weightSum) ret := make([]subConnWithWeight, 0, len(keys)) min := float64(1.0) for _, a := range keys { v, _ := subConns.Get(a) scInfo := v.(*subConn) - nw := float64(getWeightAttribute(a)) / weightSumF + // getWeightAttribute() returns 1 if the weight attribute is not found + // on the address. And since this function is guaranteed to be called + // with a non-empty subConns map, weightSum is guaranteed to be + // non-zero. So, we need not worry about divide by zero error here. + nw := float64(getWeightAttribute(a)) / weightSum ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw}) if nw < min { min = nw @@ -142,7 +140,7 @@ func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float // where an address is added and then removed, the RPCs will still pick the // same old SubConn. sort.Slice(ret, func(i, j int) bool { return ret[i].sc.addr < ret[j].sc.addr }) - return ret, min, nil + return ret, min } // pick does a binary search. It returns the item with smallest index i that diff --git a/xds/internal/balancer/ringhash/ring_test.go b/xds/internal/balancer/ringhash/ring_test.go index 20184ab8d20..b1d98760990 100644 --- a/xds/internal/balancer/ringhash/ring_test.go +++ b/xds/internal/balancer/ringhash/ring_test.go @@ -52,7 +52,7 @@ func (s) TestRingNew(t *testing.T) { for _, min := range []uint64{3, 4, 6, 8} { for _, max := range []uint64{20, 8} { t.Run(fmt.Sprintf("size-min-%v-max-%v", min, max), func(t *testing.T) { - r, _ := newRing(testSubConnMap, min, max) + r := newRing(testSubConnMap, min, max) totalCount := len(r.items) if totalCount < int(min) || totalCount > int(max) { t.Fatalf("unexpect size %v, want min %v, max %v", totalCount, min, max) @@ -82,7 +82,7 @@ func equalApproximately(x, y float64) bool { } func (s) TestRingPick(t *testing.T) { - r, _ := newRing(testSubConnMap, 10, 20) + r := newRing(testSubConnMap, 10, 20) for _, h := range []uint64{xxhash.Sum64String("1"), xxhash.Sum64String("2"), xxhash.Sum64String("3"), xxhash.Sum64String("4")} { t.Run(fmt.Sprintf("picking-hash-%v", h), func(t *testing.T) { e := r.pick(h) @@ -100,7 +100,7 @@ func (s) TestRingPick(t *testing.T) { } func (s) TestRingNext(t *testing.T) { - r, _ := newRing(testSubConnMap, 10, 20) + r := newRing(testSubConnMap, 10, 20) for _, e := range r.items { ne := r.next(e) diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index 2d1ffc9725d..1971863a4a5 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -286,12 +286,9 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err b.config = newConfig if regenerateRing { - ring, err := newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize) - if err != nil { - b.ResolverError(fmt.Errorf("ringhash failed to make a new ring: %v", err)) - return balancer.ErrBadResolverState - } - b.ring = ring + // Ring creation is guaranteed to not fail because we call newRing() + // with a non-empty subConns map. + b.ring = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize) b.regeneratePicker() b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) } From 6a45d8ee1bf5d211ddcb5a756c6d58750b1859bd Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 3 Aug 2022 16:52:17 -0700 Subject: [PATCH 5/5] review comments --- xds/internal/balancer/ringhash/ring.go | 8 ++++---- xds/internal/balancer/ringhash/ringhash.go | 18 +++++++++--------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/xds/internal/balancer/ringhash/ring.go b/xds/internal/balancer/ringhash/ring.go index 7536017629c..71d31eaeb8b 100644 --- a/xds/internal/balancer/ringhash/ring.go +++ b/xds/internal/balancer/ringhash/ring.go @@ -113,10 +113,10 @@ func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) *ri // // Must be called with a non-empty subConns map. func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64) { - var weightSum float64 + var weightSum uint32 keys := subConns.Keys() for _, a := range keys { - weightSum += float64(getWeightAttribute(a)) + weightSum += getWeightAttribute(a) } ret := make([]subConnWithWeight, 0, len(keys)) min := float64(1.0) @@ -126,8 +126,8 @@ func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float // getWeightAttribute() returns 1 if the weight attribute is not found // on the address. And since this function is guaranteed to be called // with a non-empty subConns map, weightSum is guaranteed to be - // non-zero. So, we need not worry about divide by zero error here. - nw := float64(getWeightAttribute(a)) / weightSum + // non-zero. So, we need not worry about divide a by zero error here. + nw := float64(getWeightAttribute(a)) / float64(weightSum) ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw}) if nw < min { min = nw diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index 1971863a4a5..e2ad49fca4a 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -264,15 +264,6 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) } - // If resolver state contains no addresses, return an error so ClientConn - // will trigger re-resolve. Also records this as an resolver error, so when - // the overall state turns transient failure, the error message will have - // the zero address information. - if len(s.ResolverState.Addresses) == 0 { - b.ResolverError(errors.New("produced zero addresses")) - return balancer.ErrBadResolverState - } - // If addresses were updated, whether it resulted in SubConn // creation/deletion, or just weight update, we need to regenerate the ring // and send a new picker. @@ -285,6 +276,15 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err } b.config = newConfig + // If resolver state contains no addresses, return an error so ClientConn + // will trigger re-resolve. Also records this as an resolver error, so when + // the overall state turns transient failure, the error message will have + // the zero address information. + if len(s.ResolverState.Addresses) == 0 { + b.ResolverError(errors.New("produced zero addresses")) + return balancer.ErrBadResolverState + } + if regenerateRing { // Ring creation is guaranteed to not fail because we call newRing() // with a non-empty subConns map.