diff --git a/xds/internal/balancer/ringhash/ring.go b/xds/internal/balancer/ringhash/ring.go index 68e844cfb48..5e8d881006e 100644 --- a/xds/internal/balancer/ringhash/ring.go +++ b/xds/internal/balancer/ringhash/ring.go @@ -43,8 +43,8 @@ type ringEntry struct { sc *subConn } -// newRing creates a ring from the subConns. The ring size is limited by the -// passed in max/min. +// newRing creates a ring from the subConns stored in the AddressMap. The ring +// size is limited by the passed in max/min. // // ring entries will be created for each subConn, and subConn with high weight // (specified by the address) may have multiple entries. @@ -64,7 +64,7 @@ type ringEntry struct { // // To pick from a ring, a binary search will be done for the given target hash, // and first item with hash >= given hash will be returned. -func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize uint64) (*ring, error) { +func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*ring, error) { // https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114 normalizedWeights, minWeight, err := normalizeWeights(subConns) if err != nil { @@ -95,7 +95,7 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui for _, scw := range normalizedWeights { targetIdx += scale * scw.weight for float64(idx) < targetIdx { - h := xxhash.Sum64String(scw.sc.addr + strconv.Itoa(len(items))) + h := xxhash.Sum64String(scw.sc.addr + strconv.Itoa(idx)) items = append(items, &ringEntry{idx: idx, hash: h, sc: scw.sc}) idx++ } @@ -111,26 +111,26 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui // normalizeWeights divides all the weights by the sum, so that the total weight // is 1. -func normalizeWeights(subConns map[resolver.Address]*subConn) (_ []subConnWithWeight, min float64, _ error) { - if len(subConns) == 0 { +func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64, error) { + keys := subConns.Keys() + if len(keys) == 0 { return nil, 0, fmt.Errorf("number of subconns is 0") } var weightSum uint32 - for a := range subConns { - // The address weight was moved from attributes to the Metadata field. - // This is necessary (all the attributes need to be stripped) for the - // balancer to detect identical {address+weight} combination. - weightSum += a.Metadata.(uint32) + for _, a := range keys { + weightSum += getWeightAttribute(a) } if weightSum == 0 { return nil, 0, fmt.Errorf("total weight of all subconns is 0") } weightSumF := float64(weightSum) - ret := make([]subConnWithWeight, 0, len(subConns)) - min = math.MaxFloat64 - for a, sc := range subConns { - nw := float64(a.Metadata.(uint32)) / weightSumF - ret = append(ret, subConnWithWeight{sc: sc, weight: nw}) + ret := make([]subConnWithWeight, 0, len(keys)) + min := float64(1.0) + for _, a := range keys { + v, _ := subConns.Get(a) + scInfo := v.(*subConn) + nw := float64(getWeightAttribute(a)) / weightSumF + ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw}) if nw < min { min = nw } diff --git a/xds/internal/balancer/ringhash/ring_test.go b/xds/internal/balancer/ringhash/ring_test.go index a47215339cf..20184ab8d20 100644 --- a/xds/internal/balancer/ringhash/ring_test.go +++ b/xds/internal/balancer/ringhash/ring_test.go @@ -24,25 +24,31 @@ import ( "testing" xxhash "github.com/cespare/xxhash/v2" + "google.golang.org/grpc/balancer/weightedroundrobin" "google.golang.org/grpc/resolver" ) -func testAddr(addr string, weight uint32) resolver.Address { - return resolver.Address{Addr: addr, Metadata: weight} -} +var testAddrs []resolver.Address +var testSubConnMap *resolver.AddressMap -func (s) TestRingNew(t *testing.T) { - testAddrs := []resolver.Address{ +func init() { + testAddrs = []resolver.Address{ testAddr("a", 3), testAddr("b", 3), testAddr("c", 4), } + testSubConnMap = resolver.NewAddressMap() + testSubConnMap.Set(testAddrs[0], &subConn{addr: "a"}) + testSubConnMap.Set(testAddrs[1], &subConn{addr: "b"}) + testSubConnMap.Set(testAddrs[2], &subConn{addr: "c"}) +} + +func testAddr(addr string, weight uint32) resolver.Address { + return weightedroundrobin.SetAddrInfo(resolver.Address{Addr: addr}, weightedroundrobin.AddrInfo{Weight: weight}) +} + +func (s) TestRingNew(t *testing.T) { var totalWeight float64 = 10 - testSubConnMap := map[resolver.Address]*subConn{ - testAddr("a", 3): {addr: "a"}, - testAddr("b", 3): {addr: "b"}, - testAddr("c", 4): {addr: "c"}, - } for _, min := range []uint64{3, 4, 6, 8} { for _, max := range []uint64{20, 8} { t.Run(fmt.Sprintf("size-min-%v-max-%v", min, max), func(t *testing.T) { @@ -59,7 +65,7 @@ func (s) TestRingNew(t *testing.T) { } } got := float64(count) / float64(totalCount) - want := float64(a.Metadata.(uint32)) / totalWeight + want := float64(getWeightAttribute(a)) / totalWeight if !equalApproximately(got, want) { t.Fatalf("unexpected item weight in ring: %v != %v", got, want) } @@ -76,11 +82,7 @@ func equalApproximately(x, y float64) bool { } func (s) TestRingPick(t *testing.T) { - r, _ := newRing(map[resolver.Address]*subConn{ - {Addr: "a", Metadata: uint32(3)}: {addr: "a"}, - {Addr: "b", Metadata: uint32(3)}: {addr: "b"}, - {Addr: "c", Metadata: uint32(4)}: {addr: "c"}, - }, 10, 20) + r, _ := newRing(testSubConnMap, 10, 20) for _, h := range []uint64{xxhash.Sum64String("1"), xxhash.Sum64String("2"), xxhash.Sum64String("3"), xxhash.Sum64String("4")} { t.Run(fmt.Sprintf("picking-hash-%v", h), func(t *testing.T) { e := r.pick(h) @@ -98,11 +100,7 @@ func (s) TestRingPick(t *testing.T) { } func (s) TestRingNext(t *testing.T) { - r, _ := newRing(map[resolver.Address]*subConn{ - {Addr: "a", Metadata: uint32(3)}: {addr: "a"}, - {Addr: "b", Metadata: uint32(3)}: {addr: "b"}, - {Addr: "c", Metadata: uint32(4)}: {addr: "c"}, - }, 10, 20) + r, _ := newRing(testSubConnMap, 10, 20) for _, e := range r.items { ne := r.next(e) diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index 4e9c1772b16..3e06fc4eb6e 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -47,7 +47,7 @@ type bb struct{} func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { b := &ringhashBalancer{ cc: cc, - subConns: make(map[resolver.Address]*subConn), + subConns: resolver.NewAddressMap(), scStates: make(map[balancer.SubConn]*subConn), csEvltr: &connectivityStateEvaluator{}, } @@ -65,8 +65,9 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err } type subConn struct { - addr string - sc balancer.SubConn + addr string + weight uint32 + sc balancer.SubConn mu sync.RWMutex // This is the actual state of this SubConn (as updated by the ClientConn). @@ -178,9 +179,8 @@ type ringhashBalancer struct { cc balancer.ClientConn logger *grpclog.PrefixLogger - config *LBConfig - - subConns map[resolver.Address]*subConn // `attributes` is stripped from the keys of this map (the addresses) + config *LBConfig + subConns *resolver.AddressMap // Map from resolver.Address to `*subConn`. scStates map[balancer.SubConn]*subConn // ring is always in sync with subConns. When subConns change, a new ring is @@ -208,55 +208,47 @@ type ringhashBalancer struct { // SubConn states are Idle. func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { var addrsUpdated bool - // addrsSet is the set converted from addrs, it's used for quick lookup of - // an address. - // - // Addresses in this map all have attributes stripped, but metadata set to - // the weight. So that weight change can be detected. - // - // TODO: this won't be necessary if there are ways to compare address - // attributes. - addrsSet := make(map[resolver.Address]struct{}) - for _, a := range addrs { - aNoAttrs := a - // Strip attributes but set Metadata to the weight. - aNoAttrs.Attributes = nil - w := weightedroundrobin.GetAddrInfo(a).Weight - if w == 0 { - // If weight is not set, use 1. - w = 1 - } - aNoAttrs.Metadata = w - addrsSet[aNoAttrs] = struct{}{} - if scInfo, ok := b.subConns[aNoAttrs]; !ok { - // When creating SubConn, the original address with attributes is - // passed through. So that connection configurations in attributes - // (like creds) will be used. - sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: true}) + // addrsSet is the set converted from addrs, used for quick lookup. + addrsSet := resolver.NewAddressMap() + for _, addr := range addrs { + addrsSet.Set(addr, true) + newWeight := getWeightAttribute(addr) + if val, ok := b.subConns.Get(addr); !ok { + sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true}) if err != nil { logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err) continue } - scs := &subConn{addr: a.Addr, sc: sc} + scs := &subConn{addr: addr.Addr, weight: newWeight, sc: sc} scs.setState(connectivity.Idle) b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle) - b.subConns[aNoAttrs] = scs + b.subConns.Set(addr, scs) b.scStates[sc] = scs addrsUpdated = true } else { - // Always update the subconn's address in case the attributes - // changed. The SubConn does a reflect.DeepEqual of the new and old - // addresses. So this is a noop if the current address is the same - // as the old one (including attributes). - b.subConns[aNoAttrs] = scInfo - b.cc.UpdateAddresses(scInfo.sc, []resolver.Address{a}) + // We have seen this address before and created a subConn for it. If the + // weight associated with the address has changed, update the subConns map + // with the new weight. This will be used when a new ring is created. + // + // There is no need to call UpdateAddresses on the subConn at this point + // since *only* the weight attribute has changed, and that does not affect + // subConn uniqueness. + scInfo := val.(*subConn) + if oldWeight := scInfo.weight; oldWeight != newWeight { + scInfo.weight = newWeight + b.subConns.Set(addr, scInfo) + // Return true to force recreation of the ring. + addrsUpdated = true + } } } - for a, scInfo := range b.subConns { - // a was removed by resolver. - if _, ok := addrsSet[a]; !ok { + for _, addr := range b.subConns.Keys() { + // addr was removed by resolver. + if _, ok := addrsSet.Get(addr); !ok { + v, _ := b.subConns.Get(addr) + scInfo := v.(*subConn) b.cc.RemoveSubConn(scInfo.sc) - delete(b.subConns, a) + b.subConns.Delete(addr) addrsUpdated = true // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. // The entry will be deleted in UpdateSubConnState. @@ -304,7 +296,7 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err func (b *ringhashBalancer) ResolverError(err error) { b.resolverErr = err - if len(b.subConns) == 0 { + if b.subConns.Len() == 0 { b.state = connectivity.TransientFailure } @@ -392,7 +384,8 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance // attempting to connect, we need to trigger one. But since the deleted // SubConn will eventually send a shutdown update, this code will run // and trigger the next SubConn to connect. - for _, sc := range b.subConns { + for _, v := range b.subConns.Values() { + sc := v.(*subConn) if sc.isAttemptingToConnect() { return } @@ -485,3 +478,18 @@ func (cse *connectivityStateEvaluator) recordTransition(oldState, newState conne } return connectivity.TransientFailure } + +// getWeightAttribute is a convenience function which returns the value of the +// weight attribute stored in the BalancerAttributes field of addr, using the +// weightedroundrobin package. +// +// When used in the xDS context, the weight attribute is guaranteed to be +// non-zero. But, when used in a non-xDS context, the weight attribute could be +// unset. A Default of 1 is used in the latter case. +func getWeightAttribute(addr resolver.Address) uint32 { + w := weightedroundrobin.GetAddrInfo(addr).Weight + if w == 0 { + return 1 + } + return w +} diff --git a/xds/internal/balancer/ringhash/ringhash_test.go b/xds/internal/balancer/ringhash/ringhash_test.go index cac8476b929..02302321ce5 100644 --- a/xds/internal/balancer/ringhash/ringhash_test.go +++ b/xds/internal/balancer/ringhash/ringhash_test.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/xds/internal" ) var ( @@ -491,3 +492,26 @@ func (s) TestConnectivityStateEvaluatorRecordTransition(t *testing.T) { }) } } + +// TestAddrBalancerAttributesChange tests the case where the ringhash balancer +// receives a ClientConnUpdate with the same config and addresses as received in +// the previous update. Although the `BalancerAttributes` contents are the same, +// the pointer is different. This test verifies that subConns are not recreated +// in this scenario. +func (s) TestAddrBalancerAttributesChange(t *testing.T) { + addrs1 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})} + cc, b, _ := setupTest(t, addrs1) + + addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})} + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: addrs2}, + BalancerConfig: nil, + }); err != nil { + t.Fatalf("UpdateClientConnState returned err: %v", err) + } + select { + case <-cc.NewSubConnCh: + t.Fatal("new subConn created for an update with the same addresses") + case <-time.After(defaultTestShortTimeout): + } +}