Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ringhash: don't recreate subConns when update doesn't change address information #5431

Merged
merged 7 commits into from Jul 7, 2022
29 changes: 15 additions & 14 deletions xds/internal/balancer/ringhash/ring.go
Expand Up @@ -43,8 +43,8 @@ type ringEntry struct {
sc *subConn
}

// newRing creates a ring from the subConns. The ring size is limited by the
// passed in max/min.
// newRing creates a ring from the subConns stored in the AddressMap. The ring
// size is limited by the passed in max/min.
//
// ring entries will be created for each subConn, and subConn with high weight
// (specified by the address) may have multiple entries.
Expand All @@ -64,7 +64,7 @@ type ringEntry struct {
//
// To pick from a ring, a binary search will be done for the given target hash,
// and first item with hash >= given hash will be returned.
func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize uint64) (*ring, error) {
func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*ring, error) {
// https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114
normalizedWeights, minWeight, err := normalizeWeights(subConns)
if err != nil {
Expand Down Expand Up @@ -111,26 +111,27 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui

// normalizeWeights divides all the weights by the sum, so that the total weight
// is 1.
func normalizeWeights(subConns map[resolver.Address]*subConn) (_ []subConnWithWeight, min float64, _ error) {
if len(subConns) == 0 {
func normalizeWeights(subConns *resolver.AddressMap) (_ []subConnWithWeight, min float64, _ error) {
if subConns.Len() == 0 {
return nil, 0, fmt.Errorf("number of subconns is 0")
}
var weightSum uint32
for a := range subConns {
// The address weight was moved from attributes to the Metadata field.
// This is necessary (all the attributes need to be stripped) for the
// balancer to detect identical {address+weight} combination.
weightSum += a.Metadata.(uint32)
for _, a := range subConns.Keys() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's store Keys() in a local variable, since we range over it again later. We can also get it on the first line and use len(keys) instead of subConns.Len().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// The address weight was moved from `BalancerAttributes` field to the
// `Attributes` field in `updateAddresses()` method.
weightSum += getWeightAttribute(a)
}
if weightSum == 0 {
return nil, 0, fmt.Errorf("total weight of all subconns is 0")
}
weightSumF := float64(weightSum)
ret := make([]subConnWithWeight, 0, len(subConns))
ret := make([]subConnWithWeight, 0, subConns.Len())
min = math.MaxFloat64
for a, sc := range subConns {
nw := float64(a.Metadata.(uint32)) / weightSumF
ret = append(ret, subConnWithWeight{sc: sc, weight: nw})
for _, a := range subConns.Keys() {
v, _ := subConns.Get(a)
scInfo := v.(*subConn)
nw := float64(getWeightAttribute(a)) / weightSumF
ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw})
if nw < min {
min = nw
}
Expand Down
39 changes: 18 additions & 21 deletions xds/internal/balancer/ringhash/ring_test.go
Expand Up @@ -27,22 +27,27 @@ import (
"google.golang.org/grpc/resolver"
)

func testAddr(addr string, weight uint32) resolver.Address {
return resolver.Address{Addr: addr, Metadata: weight}
}
var testAddrs []resolver.Address
var testSubConnMap *resolver.AddressMap

func (s) TestRingNew(t *testing.T) {
testAddrs := []resolver.Address{
func init() {
testAddrs = []resolver.Address{
testAddr("a", 3),
testAddr("b", 3),
testAddr("c", 4),
}
testSubConnMap = resolver.NewAddressMap()
testSubConnMap.Set(testAddrs[0], &subConn{addr: "a"})
testSubConnMap.Set(testAddrs[1], &subConn{addr: "b"})
testSubConnMap.Set(testAddrs[2], &subConn{addr: "c"})
}

func testAddr(addr string, weight uint32) resolver.Address {
return setWeightAttribute(resolver.Address{Addr: addr}, weight)
}

func (s) TestRingNew(t *testing.T) {
var totalWeight float64 = 10
testSubConnMap := map[resolver.Address]*subConn{
testAddr("a", 3): {addr: "a"},
testAddr("b", 3): {addr: "b"},
testAddr("c", 4): {addr: "c"},
}
for _, min := range []uint64{3, 4, 6, 8} {
for _, max := range []uint64{20, 8} {
t.Run(fmt.Sprintf("size-min-%v-max-%v", min, max), func(t *testing.T) {
Expand All @@ -59,7 +64,7 @@ func (s) TestRingNew(t *testing.T) {
}
}
got := float64(count) / float64(totalCount)
want := float64(a.Metadata.(uint32)) / totalWeight
want := float64(getWeightAttribute(a)) / totalWeight
if !equalApproximately(got, want) {
t.Fatalf("unexpected item weight in ring: %v != %v", got, want)
}
Expand All @@ -76,11 +81,7 @@ func equalApproximately(x, y float64) bool {
}

func (s) TestRingPick(t *testing.T) {
r, _ := newRing(map[resolver.Address]*subConn{
{Addr: "a", Metadata: uint32(3)}: {addr: "a"},
{Addr: "b", Metadata: uint32(3)}: {addr: "b"},
{Addr: "c", Metadata: uint32(4)}: {addr: "c"},
}, 10, 20)
r, _ := newRing(testSubConnMap, 10, 20)
for _, h := range []uint64{xxhash.Sum64String("1"), xxhash.Sum64String("2"), xxhash.Sum64String("3"), xxhash.Sum64String("4")} {
t.Run(fmt.Sprintf("picking-hash-%v", h), func(t *testing.T) {
e := r.pick(h)
Expand All @@ -98,11 +99,7 @@ func (s) TestRingPick(t *testing.T) {
}

func (s) TestRingNext(t *testing.T) {
r, _ := newRing(map[resolver.Address]*subConn{
{Addr: "a", Metadata: uint32(3)}: {addr: "a"},
{Addr: "b", Metadata: uint32(3)}: {addr: "b"},
{Addr: "c", Metadata: uint32(4)}: {addr: "c"},
}, 10, 20)
r, _ := newRing(testSubConnMap, 10, 20)

for _, e := range r.items {
ne := r.next(e)
Expand Down
72 changes: 40 additions & 32 deletions xds/internal/balancer/ringhash/ringhash.go
Expand Up @@ -47,7 +47,7 @@ type bb struct{}
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &ringhashBalancer{
cc: cc,
subConns: make(map[resolver.Address]*subConn),
subConns: resolver.NewAddressMap(),
scStates: make(map[balancer.SubConn]*subConn),
csEvltr: &connectivityStateEvaluator{},
}
Expand Down Expand Up @@ -180,7 +180,18 @@ type ringhashBalancer struct {

config *LBConfig

subConns map[resolver.Address]*subConn // `attributes` is stripped from the keys of this map (the addresses)
// The key for this map is a resolver.Address with the following
// modification:
// - `Attributes` field is cleared and rewritten with a single attribute
// containing the weight of the address. The `AddressMap` type uses the
// `Attributes` field to determine equality, but ignores the
// `BalancerAttributes` field. Hence, we copy over the weight of the
// address from the latter to the former.
easwars marked this conversation as resolved.
Show resolved Hide resolved
// The ringhash LB policy is concerned only with the address value and its
// weight, when comparing addresses received as part of a ClientConnUpdate.
//
// The value type stored in this map is a `*subConn`.
subConns *resolver.AddressMap
scStates map[balancer.SubConn]*subConn

// ring is always in sync with subConns. When subConns change, a new ring is
Expand Down Expand Up @@ -208,55 +219,51 @@ type ringhashBalancer struct {
// SubConn states are Idle.
func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
var addrsUpdated bool
// addrsSet is the set converted from addrs, it's used for quick lookup of
// an address.
//
// Addresses in this map all have attributes stripped, but metadata set to
// the weight. So that weight change can be detected.
//
// TODO: this won't be necessary if there are ways to compare address
// attributes.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range addrs {
aNoAttrs := a
// Strip attributes but set Metadata to the weight.
aNoAttrs.Attributes = nil
w := weightedroundrobin.GetAddrInfo(a).Weight
if w == 0 {
// addrsSet is the set converted from addrs, it's used for quick lookup of an
// address. Key type here is the same as that of the `subConns` map.
addrsSet := resolver.NewAddressMap()
for _, addr := range addrs {
addrInfo := weightedroundrobin.GetAddrInfo(addr)
if addrInfo.Weight == 0 {

// If weight is not set, use 1.
w = 1
addrInfo.Weight = 1
}
aNoAttrs.Metadata = w
addrsSet[aNoAttrs] = struct{}{}
if scInfo, ok := b.subConns[aNoAttrs]; !ok {
modifiedAddr := addr
modifiedAddr.Attributes = nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why clear Attributes? That seems wrong, since Attributes will affect subchannel creation. I think we should be clearing BalancerAttributes if anything (or just leave it alone since it doesn't matter for comparisons?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you see line 240, we use the original addr for subchannel creation, thereby passing all the attributes that we received. This modifiedAddr is used only to key the subConns map and here we are interested only in the address weight, which is currently stored as part of the BalancerAttributes field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed this to not ignore Attributes.

modifiedAddr = setWeightAttribute(modifiedAddr, addrInfo.Weight)
addrsSet.Set(modifiedAddr, true)
if val, ok := b.subConns.Get(modifiedAddr); !ok {
// When creating SubConn, the original address with attributes is
// passed through. So that connection configurations in attributes
// (like creds) will be used.
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
scs := &subConn{addr: a.Addr, sc: sc}
scs := &subConn{addr: addr.Addr, sc: sc}
scs.setState(connectivity.Idle)
b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle)
b.subConns[aNoAttrs] = scs
b.subConns.Set(modifiedAddr, scs)
b.scStates[sc] = scs
addrsUpdated = true
} else {
// Always update the subconn's address in case the attributes
// changed. The SubConn does a reflect.DeepEqual of the new and old
// addresses. So this is a noop if the current address is the same
// as the old one (including attributes).
b.subConns[aNoAttrs] = scInfo
b.cc.UpdateAddresses(scInfo.sc, []resolver.Address{a})
scInfo := val.(*subConn)
b.cc.UpdateAddresses(scInfo.sc, []resolver.Address{addr})
}
}
for a, scInfo := range b.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
for _, addr := range b.subConns.Keys() {
// addr was removed by resolver.
if _, ok := addrsSet.Get(addr); !ok {
v, _ := b.subConns.Get(addr)
scInfo := v.(*subConn)
b.cc.RemoveSubConn(scInfo.sc)
delete(b.subConns, a)
b.subConns.Delete(addr)
addrsUpdated = true
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
Expand Down Expand Up @@ -304,7 +311,7 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err

func (b *ringhashBalancer) ResolverError(err error) {
b.resolverErr = err
if len(b.subConns) == 0 {
if b.subConns.Len() == 0 {
b.state = connectivity.TransientFailure
}

Expand Down Expand Up @@ -392,7 +399,8 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance
// attempting to connect, we need to trigger one. But since the deleted
// SubConn will eventually send a shutdown update, this code will run
// and trigger the next SubConn to connect.
for _, sc := range b.subConns {
for _, v := range b.subConns.Values() {
sc := v.(*subConn)
if sc.isAttemptingToConnect() {
return
}
Expand Down
24 changes: 24 additions & 0 deletions xds/internal/balancer/ringhash/ringhash_test.go
Expand Up @@ -33,6 +33,7 @@ import (
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
)

var (
Expand Down Expand Up @@ -491,3 +492,26 @@ func (s) TestConnectivityStateEvaluatorRecordTransition(t *testing.T) {
})
}
}

// TestAddrBalancerAttributesChange tests the case where the ringhash balancer
// receives a ClientConnUpdate with the same config and addresses as received in
// the previous update. Although the `BalancerAttributes` contents are the same,
// the pointer is different. This test verifies that subConns are not recreated
// in this scenario.
func (s) TestAddrBalancerAttributesChange(t *testing.T) {
addrs1 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
cc, b, _ := setupTest(t, addrs1)

addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs2},
BalancerConfig: nil,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
select {
case <-cc.NewSubConnCh:
t.Fatal("new subConn created for an update with the same addresses")
case <-time.After(defaultTestShortTimeout):
}
}
40 changes: 40 additions & 0 deletions xds/internal/balancer/ringhash/weight_attribute.go
@@ -0,0 +1,40 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ringhash

import "google.golang.org/grpc/resolver"

// weightAttributeKey is used as the attribute key when storing the address
// weight in the `Attributes` field of the address.
type weightAttributeKey struct{}

// setWeightAttribute returns a copy of addr in which the Attributes field is
// updated with weight.
func setWeightAttribute(addr resolver.Address, weight uint32) resolver.Address {
addr.Attributes = addr.Attributes.WithValue(weightAttributeKey{}, weight)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BalancerAttributes?

Hold up, are we saying we want an AddressMap that is sensitive to BalancerAttributes (or at least this one), i.e. attributes that do not impact subchannel uniqueness? Because that's not what AddressMap is, currently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not change the logic here wrt to what was being used from Attributes and from BalancerAttributes. And I sort of get the idea that the ringhash LB policy cares only about address weight when deciding whether it needs to recreate subConns.

But I also seem to get your point that it doesn't seem to be right to be ignoring Attributes while deciding on subchannel uniqueness. Let me check what the other implementations do. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the subConns map to work with the actual address. No modifications to either Attributes or BalancerAttributes.

return addr
}

// getWeightAttribute returns the weight stored in the Attributes fields of
// addr.
func getWeightAttribute(addr resolver.Address) uint32 {
v := addr.Attributes.Value(weightAttributeKey{})
weight, _ := v.(uint32)
return weight
}