Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ringhash: don't recreate subConns when update doesn't change address information #5431

Merged
merged 7 commits into from Jul 7, 2022
32 changes: 16 additions & 16 deletions xds/internal/balancer/ringhash/ring.go
Expand Up @@ -43,8 +43,8 @@ type ringEntry struct {
sc *subConn
}

// newRing creates a ring from the subConns. The ring size is limited by the
// passed in max/min.
// newRing creates a ring from the subConns stored in the AddressMap. The ring
// size is limited by the passed in max/min.
//
// ring entries will be created for each subConn, and subConn with high weight
// (specified by the address) may have multiple entries.
Expand All @@ -64,7 +64,7 @@ type ringEntry struct {
//
// To pick from a ring, a binary search will be done for the given target hash,
// and first item with hash >= given hash will be returned.
func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize uint64) (*ring, error) {
func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*ring, error) {
// https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114
normalizedWeights, minWeight, err := normalizeWeights(subConns)
if err != nil {
Expand Down Expand Up @@ -95,7 +95,7 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui
for _, scw := range normalizedWeights {
targetIdx += scale * scw.weight
for float64(idx) < targetIdx {
h := xxhash.Sum64String(scw.sc.addr + strconv.Itoa(len(items)))
h := xxhash.Sum64String(scw.sc.addr + strconv.Itoa(idx))
items = append(items, &ringEntry{idx: idx, hash: h, sc: scw.sc})
idx++
}
Expand All @@ -111,26 +111,26 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui

// normalizeWeights divides all the weights by the sum, so that the total weight
// is 1.
func normalizeWeights(subConns map[resolver.Address]*subConn) (_ []subConnWithWeight, min float64, _ error) {
if len(subConns) == 0 {
func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64, error) {
keys := subConns.Keys()
if len(keys) == 0 {
return nil, 0, fmt.Errorf("number of subconns is 0")
}
var weightSum uint32
for a := range subConns {
// The address weight was moved from attributes to the Metadata field.
// This is necessary (all the attributes need to be stripped) for the
// balancer to detect identical {address+weight} combination.
weightSum += a.Metadata.(uint32)
for _, a := range keys {
weightSum += getWeightAttribute(a)
}
if weightSum == 0 {
return nil, 0, fmt.Errorf("total weight of all subconns is 0")
}
weightSumF := float64(weightSum)
ret := make([]subConnWithWeight, 0, len(subConns))
min = math.MaxFloat64
for a, sc := range subConns {
nw := float64(a.Metadata.(uint32)) / weightSumF
ret = append(ret, subConnWithWeight{sc: sc, weight: nw})
ret := make([]subConnWithWeight, 0, len(keys))
min := float64(1.0)
for _, a := range keys {
v, _ := subConns.Get(a)
scInfo := v.(*subConn)
nw := float64(getWeightAttribute(a)) / weightSumF
ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw})
if nw < min {
min = nw
}
Expand Down
40 changes: 19 additions & 21 deletions xds/internal/balancer/ringhash/ring_test.go
Expand Up @@ -24,25 +24,31 @@ import (
"testing"

xxhash "github.com/cespare/xxhash/v2"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/resolver"
)

func testAddr(addr string, weight uint32) resolver.Address {
return resolver.Address{Addr: addr, Metadata: weight}
}
var testAddrs []resolver.Address
var testSubConnMap *resolver.AddressMap

func (s) TestRingNew(t *testing.T) {
testAddrs := []resolver.Address{
func init() {
testAddrs = []resolver.Address{
testAddr("a", 3),
testAddr("b", 3),
testAddr("c", 4),
}
testSubConnMap = resolver.NewAddressMap()
testSubConnMap.Set(testAddrs[0], &subConn{addr: "a"})
testSubConnMap.Set(testAddrs[1], &subConn{addr: "b"})
testSubConnMap.Set(testAddrs[2], &subConn{addr: "c"})
}

func testAddr(addr string, weight uint32) resolver.Address {
return weightedroundrobin.SetAddrInfo(resolver.Address{Addr: addr}, weightedroundrobin.AddrInfo{Weight: weight})
}

func (s) TestRingNew(t *testing.T) {
var totalWeight float64 = 10
testSubConnMap := map[resolver.Address]*subConn{
testAddr("a", 3): {addr: "a"},
testAddr("b", 3): {addr: "b"},
testAddr("c", 4): {addr: "c"},
}
for _, min := range []uint64{3, 4, 6, 8} {
for _, max := range []uint64{20, 8} {
t.Run(fmt.Sprintf("size-min-%v-max-%v", min, max), func(t *testing.T) {
Expand All @@ -59,7 +65,7 @@ func (s) TestRingNew(t *testing.T) {
}
}
got := float64(count) / float64(totalCount)
want := float64(a.Metadata.(uint32)) / totalWeight
want := float64(getWeightAttribute(a)) / totalWeight
if !equalApproximately(got, want) {
t.Fatalf("unexpected item weight in ring: %v != %v", got, want)
}
Expand All @@ -76,11 +82,7 @@ func equalApproximately(x, y float64) bool {
}

func (s) TestRingPick(t *testing.T) {
r, _ := newRing(map[resolver.Address]*subConn{
{Addr: "a", Metadata: uint32(3)}: {addr: "a"},
{Addr: "b", Metadata: uint32(3)}: {addr: "b"},
{Addr: "c", Metadata: uint32(4)}: {addr: "c"},
}, 10, 20)
r, _ := newRing(testSubConnMap, 10, 20)
for _, h := range []uint64{xxhash.Sum64String("1"), xxhash.Sum64String("2"), xxhash.Sum64String("3"), xxhash.Sum64String("4")} {
t.Run(fmt.Sprintf("picking-hash-%v", h), func(t *testing.T) {
e := r.pick(h)
Expand All @@ -98,11 +100,7 @@ func (s) TestRingPick(t *testing.T) {
}

func (s) TestRingNext(t *testing.T) {
r, _ := newRing(map[resolver.Address]*subConn{
{Addr: "a", Metadata: uint32(3)}: {addr: "a"},
{Addr: "b", Metadata: uint32(3)}: {addr: "b"},
{Addr: "c", Metadata: uint32(4)}: {addr: "c"},
}, 10, 20)
r, _ := newRing(testSubConnMap, 10, 20)

for _, e := range r.items {
ne := r.next(e)
Expand Down
98 changes: 53 additions & 45 deletions xds/internal/balancer/ringhash/ringhash.go
Expand Up @@ -47,7 +47,7 @@ type bb struct{}
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &ringhashBalancer{
cc: cc,
subConns: make(map[resolver.Address]*subConn),
subConns: resolver.NewAddressMap(),
scStates: make(map[balancer.SubConn]*subConn),
csEvltr: &connectivityStateEvaluator{},
}
Expand All @@ -65,8 +65,9 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
}

type subConn struct {
addr string
sc balancer.SubConn
addr string
weight uint32
sc balancer.SubConn

mu sync.RWMutex
// This is the actual state of this SubConn (as updated by the ClientConn).
Expand Down Expand Up @@ -178,9 +179,8 @@ type ringhashBalancer struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger

config *LBConfig

subConns map[resolver.Address]*subConn // `attributes` is stripped from the keys of this map (the addresses)
config *LBConfig
subConns *resolver.AddressMap // Map from resolver.Address to `*subConn`.
scStates map[balancer.SubConn]*subConn

// ring is always in sync with subConns. When subConns change, a new ring is
Expand Down Expand Up @@ -208,55 +208,47 @@ type ringhashBalancer struct {
// SubConn states are Idle.
func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
var addrsUpdated bool
// addrsSet is the set converted from addrs, it's used for quick lookup of
// an address.
//
// Addresses in this map all have attributes stripped, but metadata set to
// the weight. So that weight change can be detected.
//
// TODO: this won't be necessary if there are ways to compare address
// attributes.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range addrs {
aNoAttrs := a
// Strip attributes but set Metadata to the weight.
aNoAttrs.Attributes = nil
w := weightedroundrobin.GetAddrInfo(a).Weight
if w == 0 {
// If weight is not set, use 1.
w = 1
}
aNoAttrs.Metadata = w
addrsSet[aNoAttrs] = struct{}{}
if scInfo, ok := b.subConns[aNoAttrs]; !ok {
// When creating SubConn, the original address with attributes is
// passed through. So that connection configurations in attributes
// (like creds) will be used.
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
// addrsSet is the set converted from addrs, used for quick lookup.
addrsSet := resolver.NewAddressMap()
for _, addr := range addrs {
addrsSet.Set(addr, true)
newWeight := getWeightAttribute(addr)
if val, ok := b.subConns.Get(addr); !ok {
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
scs := &subConn{addr: a.Addr, sc: sc}
scs := &subConn{addr: addr.Addr, weight: newWeight, sc: sc}
scs.setState(connectivity.Idle)
b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle)
b.subConns[aNoAttrs] = scs
b.subConns.Set(addr, scs)
b.scStates[sc] = scs
addrsUpdated = true
} else {
// Always update the subconn's address in case the attributes
// changed. The SubConn does a reflect.DeepEqual of the new and old
// addresses. So this is a noop if the current address is the same
// as the old one (including attributes).
b.subConns[aNoAttrs] = scInfo
b.cc.UpdateAddresses(scInfo.sc, []resolver.Address{a})
// We have seen this address before and created a subConn for it. If the
// weight associated with the address has changed, update the subConns map
// with the new weight. This will be used when a new ring is created.
//
// There is no need to call UpdateAddresses on the subConn at this point
// since *only* the weight attribute has changed, and that does not affect
// subConn uniqueness.
scInfo := val.(*subConn)
if oldWeight := scInfo.weight; oldWeight != newWeight {
scInfo.weight = newWeight
b.subConns.Set(addr, scInfo)
// Return true to force recreation of the ring.
addrsUpdated = true
}
}
}
for a, scInfo := range b.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
for _, addr := range b.subConns.Keys() {
// addr was removed by resolver.
if _, ok := addrsSet.Get(addr); !ok {
v, _ := b.subConns.Get(addr)
scInfo := v.(*subConn)
b.cc.RemoveSubConn(scInfo.sc)
delete(b.subConns, a)
b.subConns.Delete(addr)
addrsUpdated = true
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
Expand Down Expand Up @@ -304,7 +296,7 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err

func (b *ringhashBalancer) ResolverError(err error) {
b.resolverErr = err
if len(b.subConns) == 0 {
if b.subConns.Len() == 0 {
b.state = connectivity.TransientFailure
}

Expand Down Expand Up @@ -392,7 +384,8 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance
// attempting to connect, we need to trigger one. But since the deleted
// SubConn will eventually send a shutdown update, this code will run
// and trigger the next SubConn to connect.
for _, sc := range b.subConns {
for _, v := range b.subConns.Values() {
sc := v.(*subConn)
if sc.isAttemptingToConnect() {
return
}
Expand Down Expand Up @@ -485,3 +478,18 @@ func (cse *connectivityStateEvaluator) recordTransition(oldState, newState conne
}
return connectivity.TransientFailure
}

// getWeightAttribute is a convenience function which returns the value of the
// weight attribute stored in the BalancerAttributes field of addr, using the
// weightedroundrobin package.
//
// When used in the xDS context, the weight attribute is guaranteed to be
// non-zero. But, when used in a non-xDS context, the weight attribute could be
// unset. A Default of 1 is used in the latter case.
func getWeightAttribute(addr resolver.Address) uint32 {
w := weightedroundrobin.GetAddrInfo(addr).Weight
if w == 0 {
return 1
}
return w
}
24 changes: 24 additions & 0 deletions xds/internal/balancer/ringhash/ringhash_test.go
Expand Up @@ -33,6 +33,7 @@ import (
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
)

var (
Expand Down Expand Up @@ -491,3 +492,26 @@ func (s) TestConnectivityStateEvaluatorRecordTransition(t *testing.T) {
})
}
}

// TestAddrBalancerAttributesChange tests the case where the ringhash balancer
// receives a ClientConnUpdate with the same config and addresses as received in
// the previous update. Although the `BalancerAttributes` contents are the same,
// the pointer is different. This test verifies that subConns are not recreated
// in this scenario.
func (s) TestAddrBalancerAttributesChange(t *testing.T) {
addrs1 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
cc, b, _ := setupTest(t, addrs1)

addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs2},
BalancerConfig: nil,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
select {
case <-cc.NewSubConnCh:
t.Fatal("new subConn created for an update with the same addresses")
case <-time.After(defaultTestShortTimeout):
}
}