Skip to content

Commit

Permalink
ringhash: don't recreate subConns when update doesn't change address …
Browse files Browse the repository at this point in the history
…information (#5431)
  • Loading branch information
easwars committed Jul 7, 2022
1 parent a6dcb71 commit 0d04c6f
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 82 deletions.
32 changes: 16 additions & 16 deletions xds/internal/balancer/ringhash/ring.go
Expand Up @@ -43,8 +43,8 @@ type ringEntry struct {
sc *subConn
}

// newRing creates a ring from the subConns. The ring size is limited by the
// passed in max/min.
// newRing creates a ring from the subConns stored in the AddressMap. The ring
// size is limited by the passed in max/min.
//
// ring entries will be created for each subConn, and subConn with high weight
// (specified by the address) may have multiple entries.
Expand All @@ -64,7 +64,7 @@ type ringEntry struct {
//
// To pick from a ring, a binary search will be done for the given target hash,
// and first item with hash >= given hash will be returned.
func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize uint64) (*ring, error) {
func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*ring, error) {
// https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114
normalizedWeights, minWeight, err := normalizeWeights(subConns)
if err != nil {
Expand Down Expand Up @@ -95,7 +95,7 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui
for _, scw := range normalizedWeights {
targetIdx += scale * scw.weight
for float64(idx) < targetIdx {
h := xxhash.Sum64String(scw.sc.addr + strconv.Itoa(len(items)))
h := xxhash.Sum64String(scw.sc.addr + strconv.Itoa(idx))
items = append(items, &ringEntry{idx: idx, hash: h, sc: scw.sc})
idx++
}
Expand All @@ -111,26 +111,26 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui

// normalizeWeights divides all the weights by the sum, so that the total weight
// is 1.
func normalizeWeights(subConns map[resolver.Address]*subConn) (_ []subConnWithWeight, min float64, _ error) {
if len(subConns) == 0 {
func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64, error) {
keys := subConns.Keys()
if len(keys) == 0 {
return nil, 0, fmt.Errorf("number of subconns is 0")
}
var weightSum uint32
for a := range subConns {
// The address weight was moved from attributes to the Metadata field.
// This is necessary (all the attributes need to be stripped) for the
// balancer to detect identical {address+weight} combination.
weightSum += a.Metadata.(uint32)
for _, a := range keys {
weightSum += getWeightAttribute(a)
}
if weightSum == 0 {
return nil, 0, fmt.Errorf("total weight of all subconns is 0")
}
weightSumF := float64(weightSum)
ret := make([]subConnWithWeight, 0, len(subConns))
min = math.MaxFloat64
for a, sc := range subConns {
nw := float64(a.Metadata.(uint32)) / weightSumF
ret = append(ret, subConnWithWeight{sc: sc, weight: nw})
ret := make([]subConnWithWeight, 0, len(keys))
min := float64(1.0)
for _, a := range keys {
v, _ := subConns.Get(a)
scInfo := v.(*subConn)
nw := float64(getWeightAttribute(a)) / weightSumF
ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw})
if nw < min {
min = nw
}
Expand Down
40 changes: 19 additions & 21 deletions xds/internal/balancer/ringhash/ring_test.go
Expand Up @@ -24,25 +24,31 @@ import (
"testing"

xxhash "github.com/cespare/xxhash/v2"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/resolver"
)

func testAddr(addr string, weight uint32) resolver.Address {
return resolver.Address{Addr: addr, Metadata: weight}
}
var testAddrs []resolver.Address
var testSubConnMap *resolver.AddressMap

func (s) TestRingNew(t *testing.T) {
testAddrs := []resolver.Address{
func init() {
testAddrs = []resolver.Address{
testAddr("a", 3),
testAddr("b", 3),
testAddr("c", 4),
}
testSubConnMap = resolver.NewAddressMap()
testSubConnMap.Set(testAddrs[0], &subConn{addr: "a"})
testSubConnMap.Set(testAddrs[1], &subConn{addr: "b"})
testSubConnMap.Set(testAddrs[2], &subConn{addr: "c"})
}

func testAddr(addr string, weight uint32) resolver.Address {
return weightedroundrobin.SetAddrInfo(resolver.Address{Addr: addr}, weightedroundrobin.AddrInfo{Weight: weight})
}

func (s) TestRingNew(t *testing.T) {
var totalWeight float64 = 10
testSubConnMap := map[resolver.Address]*subConn{
testAddr("a", 3): {addr: "a"},
testAddr("b", 3): {addr: "b"},
testAddr("c", 4): {addr: "c"},
}
for _, min := range []uint64{3, 4, 6, 8} {
for _, max := range []uint64{20, 8} {
t.Run(fmt.Sprintf("size-min-%v-max-%v", min, max), func(t *testing.T) {
Expand All @@ -59,7 +65,7 @@ func (s) TestRingNew(t *testing.T) {
}
}
got := float64(count) / float64(totalCount)
want := float64(a.Metadata.(uint32)) / totalWeight
want := float64(getWeightAttribute(a)) / totalWeight
if !equalApproximately(got, want) {
t.Fatalf("unexpected item weight in ring: %v != %v", got, want)
}
Expand All @@ -76,11 +82,7 @@ func equalApproximately(x, y float64) bool {
}

func (s) TestRingPick(t *testing.T) {
r, _ := newRing(map[resolver.Address]*subConn{
{Addr: "a", Metadata: uint32(3)}: {addr: "a"},
{Addr: "b", Metadata: uint32(3)}: {addr: "b"},
{Addr: "c", Metadata: uint32(4)}: {addr: "c"},
}, 10, 20)
r, _ := newRing(testSubConnMap, 10, 20)
for _, h := range []uint64{xxhash.Sum64String("1"), xxhash.Sum64String("2"), xxhash.Sum64String("3"), xxhash.Sum64String("4")} {
t.Run(fmt.Sprintf("picking-hash-%v", h), func(t *testing.T) {
e := r.pick(h)
Expand All @@ -98,11 +100,7 @@ func (s) TestRingPick(t *testing.T) {
}

func (s) TestRingNext(t *testing.T) {
r, _ := newRing(map[resolver.Address]*subConn{
{Addr: "a", Metadata: uint32(3)}: {addr: "a"},
{Addr: "b", Metadata: uint32(3)}: {addr: "b"},
{Addr: "c", Metadata: uint32(4)}: {addr: "c"},
}, 10, 20)
r, _ := newRing(testSubConnMap, 10, 20)

for _, e := range r.items {
ne := r.next(e)
Expand Down
98 changes: 53 additions & 45 deletions xds/internal/balancer/ringhash/ringhash.go
Expand Up @@ -47,7 +47,7 @@ type bb struct{}
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &ringhashBalancer{
cc: cc,
subConns: make(map[resolver.Address]*subConn),
subConns: resolver.NewAddressMap(),
scStates: make(map[balancer.SubConn]*subConn),
csEvltr: &connectivityStateEvaluator{},
}
Expand All @@ -65,8 +65,9 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
}

type subConn struct {
addr string
sc balancer.SubConn
addr string
weight uint32
sc balancer.SubConn

mu sync.RWMutex
// This is the actual state of this SubConn (as updated by the ClientConn).
Expand Down Expand Up @@ -178,9 +179,8 @@ type ringhashBalancer struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger

config *LBConfig

subConns map[resolver.Address]*subConn // `attributes` is stripped from the keys of this map (the addresses)
config *LBConfig
subConns *resolver.AddressMap // Map from resolver.Address to `*subConn`.
scStates map[balancer.SubConn]*subConn

// ring is always in sync with subConns. When subConns change, a new ring is
Expand Down Expand Up @@ -208,55 +208,47 @@ type ringhashBalancer struct {
// SubConn states are Idle.
func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
var addrsUpdated bool
// addrsSet is the set converted from addrs, it's used for quick lookup of
// an address.
//
// Addresses in this map all have attributes stripped, but metadata set to
// the weight. So that weight change can be detected.
//
// TODO: this won't be necessary if there are ways to compare address
// attributes.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range addrs {
aNoAttrs := a
// Strip attributes but set Metadata to the weight.
aNoAttrs.Attributes = nil
w := weightedroundrobin.GetAddrInfo(a).Weight
if w == 0 {
// If weight is not set, use 1.
w = 1
}
aNoAttrs.Metadata = w
addrsSet[aNoAttrs] = struct{}{}
if scInfo, ok := b.subConns[aNoAttrs]; !ok {
// When creating SubConn, the original address with attributes is
// passed through. So that connection configurations in attributes
// (like creds) will be used.
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
// addrsSet is the set converted from addrs, used for quick lookup.
addrsSet := resolver.NewAddressMap()
for _, addr := range addrs {
addrsSet.Set(addr, true)
newWeight := getWeightAttribute(addr)
if val, ok := b.subConns.Get(addr); !ok {
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
scs := &subConn{addr: a.Addr, sc: sc}
scs := &subConn{addr: addr.Addr, weight: newWeight, sc: sc}
scs.setState(connectivity.Idle)
b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle)
b.subConns[aNoAttrs] = scs
b.subConns.Set(addr, scs)
b.scStates[sc] = scs
addrsUpdated = true
} else {
// Always update the subconn's address in case the attributes
// changed. The SubConn does a reflect.DeepEqual of the new and old
// addresses. So this is a noop if the current address is the same
// as the old one (including attributes).
b.subConns[aNoAttrs] = scInfo
b.cc.UpdateAddresses(scInfo.sc, []resolver.Address{a})
// We have seen this address before and created a subConn for it. If the
// weight associated with the address has changed, update the subConns map
// with the new weight. This will be used when a new ring is created.
//
// There is no need to call UpdateAddresses on the subConn at this point
// since *only* the weight attribute has changed, and that does not affect
// subConn uniqueness.
scInfo := val.(*subConn)
if oldWeight := scInfo.weight; oldWeight != newWeight {
scInfo.weight = newWeight
b.subConns.Set(addr, scInfo)
// Return true to force recreation of the ring.
addrsUpdated = true
}
}
}
for a, scInfo := range b.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
for _, addr := range b.subConns.Keys() {
// addr was removed by resolver.
if _, ok := addrsSet.Get(addr); !ok {
v, _ := b.subConns.Get(addr)
scInfo := v.(*subConn)
b.cc.RemoveSubConn(scInfo.sc)
delete(b.subConns, a)
b.subConns.Delete(addr)
addrsUpdated = true
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
Expand Down Expand Up @@ -304,7 +296,7 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err

func (b *ringhashBalancer) ResolverError(err error) {
b.resolverErr = err
if len(b.subConns) == 0 {
if b.subConns.Len() == 0 {
b.state = connectivity.TransientFailure
}

Expand Down Expand Up @@ -392,7 +384,8 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance
// attempting to connect, we need to trigger one. But since the deleted
// SubConn will eventually send a shutdown update, this code will run
// and trigger the next SubConn to connect.
for _, sc := range b.subConns {
for _, v := range b.subConns.Values() {
sc := v.(*subConn)
if sc.isAttemptingToConnect() {
return
}
Expand Down Expand Up @@ -485,3 +478,18 @@ func (cse *connectivityStateEvaluator) recordTransition(oldState, newState conne
}
return connectivity.TransientFailure
}

// getWeightAttribute is a convenience function which returns the value of the
// weight attribute stored in the BalancerAttributes field of addr, using the
// weightedroundrobin package.
//
// When used in the xDS context, the weight attribute is guaranteed to be
// non-zero. But, when used in a non-xDS context, the weight attribute could be
// unset. A Default of 1 is used in the latter case.
func getWeightAttribute(addr resolver.Address) uint32 {
w := weightedroundrobin.GetAddrInfo(addr).Weight
if w == 0 {
return 1
}
return w
}
24 changes: 24 additions & 0 deletions xds/internal/balancer/ringhash/ringhash_test.go
Expand Up @@ -33,6 +33,7 @@ import (
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
)

var (
Expand Down Expand Up @@ -491,3 +492,26 @@ func (s) TestConnectivityStateEvaluatorRecordTransition(t *testing.T) {
})
}
}

// TestAddrBalancerAttributesChange tests the case where the ringhash balancer
// receives a ClientConnUpdate with the same config and addresses as received in
// the previous update. Although the `BalancerAttributes` contents are the same,
// the pointer is different. This test verifies that subConns are not recreated
// in this scenario.
func (s) TestAddrBalancerAttributesChange(t *testing.T) {
addrs1 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
cc, b, _ := setupTest(t, addrs1)

addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs2},
BalancerConfig: nil,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
select {
case <-cc.NewSubConnCh:
t.Fatal("new subConn created for an update with the same addresses")
case <-time.After(defaultTestShortTimeout):
}
}

0 comments on commit 0d04c6f

Please sign in to comment.