From 2a7e7c41b3019d2a18fcb54215f09df0bbe33d10 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 24 Aug 2021 15:13:41 -0700 Subject: [PATCH 01/10] [ring_hash_policy] ringhash: all [ring_hash_policy] 111 --- xds/internal/balancer/ringhash/logging.go | 34 ++ xds/internal/balancer/ringhash/picker.go | 7 + xds/internal/balancer/ringhash/ring.go | 11 + xds/internal/balancer/ringhash/ring_test.go | 3 + xds/internal/balancer/ringhash/ringhash.go | 315 +++++++++++- .../balancer/ringhash/ringhash_test.go | 450 ++++++++++++++++++ 6 files changed, 815 insertions(+), 5 deletions(-) create mode 100644 xds/internal/balancer/ringhash/logging.go diff --git a/xds/internal/balancer/ringhash/logging.go b/xds/internal/balancer/ringhash/logging.go new file mode 100644 index 00000000000..64a1d467f55 --- /dev/null +++ b/xds/internal/balancer/ringhash/logging.go @@ -0,0 +1,34 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package ringhash + +import ( + "fmt" + + "google.golang.org/grpc/grpclog" + internalgrpclog "google.golang.org/grpc/internal/grpclog" +) + +const prefix = "[ring-hash-lb %p] " + +var logger = grpclog.Component("xds") + +func prefixLogger(p *ringhashBalancer) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)) +} diff --git a/xds/internal/balancer/ringhash/picker.go b/xds/internal/balancer/ringhash/picker.go index 6d035b0c191..d47bc3490b7 100644 --- a/xds/internal/balancer/ringhash/picker.go +++ b/xds/internal/balancer/ringhash/picker.go @@ -19,6 +19,8 @@ package ringhash import ( + "log" + "fmt" "google.golang.org/grpc/balancer" @@ -31,6 +33,10 @@ type picker struct { ring *ring } +func newPicker(ring *ring) *picker { + return &picker{ring: ring} +} + // handleRICSResult is the return type of handleRICS. It's needed to wrap the // returned error from Pick() in a struct. With this, if the return values are // `balancer.PickResult, error, bool`, linter complains because error is not the @@ -75,6 +81,7 @@ func handleRICS(e *ringEntry) (handleRICSResult, bool) { func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { e := p.ring.pick(getRequestHash(info.Ctx)) + log.Printf(" * first item sc %p effinitive state: %v, state: %v", e.sc, e.sc.effectiveState(), e.sc.state) if hr, ok := handleRICS(e); ok { return hr.pr, hr.err } diff --git a/xds/internal/balancer/ringhash/ring.go b/xds/internal/balancer/ringhash/ring.go index 68e844cfb48..ced456fbd2c 100644 --- a/xds/internal/balancer/ringhash/ring.go +++ b/xds/internal/balancer/ringhash/ring.go @@ -20,6 +20,7 @@ package ringhash import ( "fmt" + "log" "math" "sort" "strconv" @@ -66,10 +67,13 @@ type ringEntry struct { // and first item with hash >= given hash will be returned. func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize uint64) (*ring, error) { // https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114 + log.Printf(" - min size: %v, max size: %v", minRingSize, maxRingSize) + log.Printf(" - subConns: %v", subConns) normalizedWeights, minWeight, err := normalizeWeights(subConns) if err != nil { return nil, err } + log.Printf(" - normalized: %v, minWeight: %v", normalizedWeights, minWeight) // Normalized weights for {3,3,4} is {0.3,0.3,0.4}. // Scale up the size of the ring such that the least-weighted host gets a @@ -78,6 +82,7 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui // Note that size is limited by the input max/min. scale := math.Min(math.Ceil(minWeight*float64(minRingSize))/minWeight, float64(maxRingSize)) ringSize := math.Ceil(scale) + log.Printf(" - ringSize: %v", ringSize) items := make([]*ringEntry, 0, int(ringSize)) // For each entry, scale*weight nodes are generated in the ring. @@ -94,7 +99,9 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui ) for _, scw := range normalizedWeights { targetIdx += scale * scw.weight + log.Printf(" -- targetIdx: %v = prev + %v * %v", targetIdx, scale, scw.weight) for float64(idx) < targetIdx { + log.Printf(" --- idx %v, addr: %v, targetIdx %v", idx, scw.sc.addr, targetIdx) h := xxhash.Sum64String(scw.sc.addr + strconv.Itoa(len(items))) items = append(items, &ringEntry{idx: idx, hash: h, sc: scw.sc}) idx++ @@ -106,6 +113,10 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui for i, ii := range items { ii.idx = i } + log.Printf(" - ring: \n") + for i, ii := range items { + log.Printf(" -- %v: %+v, %v\n", i, ii, ii.sc.addr) + } return &ring{items: items}, nil } diff --git a/xds/internal/balancer/ringhash/ring_test.go b/xds/internal/balancer/ringhash/ring_test.go index 2d664e05bb2..62b9abfbcf3 100644 --- a/xds/internal/balancer/ringhash/ring_test.go +++ b/xds/internal/balancer/ringhash/ring_test.go @@ -20,6 +20,7 @@ package ringhash import ( "fmt" + "log" "math" "testing" @@ -46,6 +47,7 @@ func TestRingNew(t *testing.T) { for _, min := range []uint64{3, 4, 6, 8} { for _, max := range []uint64{20, 8} { t.Run(fmt.Sprintf("size-min-%v-max-%v", min, max), func(t *testing.T) { + log.Println(" ---- ") r, _ := newRing(testSubConnMap, min, max) totalCount := len(r.items) if totalCount < int(min) || totalCount > int(max) { @@ -64,6 +66,7 @@ func TestRingNew(t *testing.T) { t.Fatalf("unexpected item weight in ring: %v != %v", got, want) } } + log.Println(" ---- ") }) } } diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index b87cce64801..b1801bfda6a 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -20,15 +20,51 @@ package ringhash import ( + "encoding/json" + "errors" + "fmt" + "log" "sync" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/balancer/weightedroundrobin" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/pretty" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" ) // Name is the name of the ring_hash balancer. const Name = "ring_hash_experimental" +func init() { + balancer.Register(bb{}) +} + +type bb struct{} + +func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { + b := &ringhashBalancer{ + cc: cc, + subConns: make(map[resolver.Address]*subConn), + scStates: make(map[balancer.SubConn]*subConn), + csEvltr: &connectivityStateEvaluator{}, + } + b.logger = prefixLogger(b) + b.logger.Infof("Created") + return b +} + +func (bb) Name() string { + return Name +} + +func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return parseConfig(c) +} + type subConn struct { addr string sc balancer.SubConn @@ -66,14 +102,12 @@ type subConn struct { connectQueued bool } -// SetState updates the state of this SubConn. +// setState updates the state of this SubConn. // // It also handles the queued Connect(). If the new state is Idle, and a // Connect() was queued, this SubConn will be triggered to Connect(). -// -// FIXME: unexport this. It's exported so that staticcheck doesn't complain -// about unused functions. -func (sc *subConn) SetState(s connectivity.State) { +func (sc *subConn) setState(s connectivity.State) { + log.Printf(" --- updating sc %p to state %v", sc, s) sc.mu.Lock() defer sc.mu.Unlock() // Any state change to non-Idle means there was an attempt to connect. @@ -121,3 +155,274 @@ func (sc *subConn) queueConnect() { // after backoff in TransientFailure), it will Connect(). sc.connectQueued = true } + +type ringhashBalancer struct { + cc balancer.ClientConn + logger *grpclog.PrefixLogger + + config *LBConfig + + subConns map[resolver.Address]*subConn // `attributes` is stripped from the keys of this map (the addresses) + scStates map[balancer.SubConn]*subConn + + // ring is always in sync with subConns. When subConns change, a new ring is + // generated. Note that address weights updates (they are keys in the + // subConns map) also regenerates the ring. + ring *ring + picker balancer.Picker + csEvltr *connectivityStateEvaluator + state connectivity.State + + resolverErr error // the last error reported by the resolver; cleared on successful resolution + connErr error // the last connection error; cleared upon leaving TransientFailure +} + +// updateAddresses creates new SubConns and removes SubConns, based on the +// address update. +// +// The return value is whether the new address list is different from the +// previous. True if +// - an addresses was added +// - an addresses was removed +// - an address's weight was updated +// +// Note that this function doesn't trigger SubConn connecting, so all the new +// SubConn states are IDLE. +func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { + var addrsUpdated bool + // addrsSet is the set converted from addrs, it's used for quick lookup of + // an address. + // + // Addresses in this map all have attributes stripped, but metadata set to + // the weight. So that weight change can be detected. + // TODO: this won't be necessary if there are ways to compare address + // attributes. + addrsSet := make(map[resolver.Address]struct{}) + for _, a := range addrs { + aNoAttrs := a + // Strip attributes but set Metadata to the weight. + aNoAttrs.Attributes = nil + w := weightedroundrobin.GetAddrInfo(a).Weight + if w == 0 { + // If weight is not set, use 1. + w = 1 + } + aNoAttrs.Metadata = w + addrsSet[aNoAttrs] = struct{}{} + if scInfo, ok := b.subConns[aNoAttrs]; !ok { + // When creating SubConn, the original address with attributes is + // passed through. So that connection configurations in attributes + // (like creds) will be used. + sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: true}) + if err != nil { + logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err) + continue + } + scs := &subConn{addr: a.Addr, sc: sc} + scs.setState(connectivity.Idle) + b.subConns[aNoAttrs] = scs + b.scStates[sc] = scs + addrsUpdated = true + } else { + // Always update the subconn's address in case the attributes + // changed. The SubConn does a reflect.DeepEqual of the new and old + // addresses. So this is a noop if the current address is the same + // as the old one (including attributes). + b.subConns[aNoAttrs] = scInfo + b.cc.UpdateAddresses(scInfo.sc, []resolver.Address{a}) + } + } + for a, scInfo := range b.subConns { + // a was removed by resolver. + if _, ok := addrsSet[a]; !ok { + b.cc.RemoveSubConn(scInfo.sc) + delete(b.subConns, a) + addrsUpdated = true + // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. + // The entry will be deleted in UpdateSubConnState. + } + } + return addrsUpdated +} + +func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig)) + if b.config == nil { + newConfig, ok := s.BalancerConfig.(*LBConfig) + if !ok { + return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) + } + b.config = newConfig + } + + // Successful resolution; clear resolver error and ensure we return nil. + b.resolverErr = nil + if b.updateAddresses(s.ResolverState.Addresses) { + // If addresses were updated, no matter whether it resulted in SubConn + // creation/deletion, or just weight update, we will need to regenerate + // the ring. + var err error + b.ring, err = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize) + if err != nil { + panic(err) + } + b.regeneratePicker() + b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) + } + + // If resolver state contains no addresses, return an error so ClientConn + // will trigger re-resolve. Also records this as an resolver error, so when + // the overall state turns transient failure, the error message will have + // the zero address information. + if len(s.ResolverState.Addresses) == 0 { + b.ResolverError(errors.New("produced zero addresses")) + return balancer.ErrBadResolverState + } + return nil +} + +func (b *ringhashBalancer) ResolverError(err error) { + b.resolverErr = err + if len(b.subConns) == 0 { + b.state = connectivity.TransientFailure + } + + if b.state != connectivity.TransientFailure { + // The picker will not change since the balancer does not currently + // report an error. + return + } + b.regeneratePicker() + b.cc.UpdateState(balancer.State{ + ConnectivityState: b.state, + Picker: b.picker, + }) +} + +// UpdateSubConnState updates the per-SubConn state stored in the ring, and also +// the aggregated state. +// +// It triggers an update to cc when: +// - the new state is TransientFailure, to update the error message +// - it's possible that this is a noop, but sending an extra update is easier +// than comparing errors +// - the aggregated state is changed +// - the same picker will be sent again, but this update may trigger a re-pick +// for some RPCs. +func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + s := state.ConnectivityState + b.logger.Infof("handle SubConn state change: %p, %v", sc, s) + scs, ok := b.scStates[sc] + if !ok { + b.logger.Infof("got state changes for an unknown SubConn: %p, %v", sc, s) + return + } + oldSCState := scs.effectiveState() + scs.setState(s) + newSCState := scs.effectiveState() + + var sendUpdate bool + oldBalancerState := b.state + b.state = b.csEvltr.recordTransition(oldSCState, newSCState) + if oldBalancerState != b.state { + sendUpdate = true + } + + switch s { + case connectivity.Idle: + // When the overall state is TransientFailure, this will never get picks + // if there's a lower priority. Need to keep the SubConns connecting so + // there's a chance it will recover. + if b.state == connectivity.TransientFailure { + scs.queueConnect() + } + // Resend the picker, there's no need to regenerate the picker because + // the ring didn't change. + sendUpdate = true + case connectivity.Connecting, connectivity.Ready: + // Resend the picker, there's no need to regenerate the picker because + // the ring didn't change. + sendUpdate = true + case connectivity.TransientFailure: + // Save error to be reported via picker. + b.connErr = state.ConnectionError + // Regenerate picker to update error message. + b.regeneratePicker() + sendUpdate = true + case connectivity.Shutdown: + // When an address was removed by resolver, b called RemoveSubConn but + // kept the sc's state in scStates. Remove state for this sc here. + delete(b.scStates, sc) + } + + if sendUpdate { + b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) + } +} + +// mergeErrors builds an error from the last connection error and the last +// resolver error. Must only be called if b.state is TransientFailure. +func (b *ringhashBalancer) mergeErrors() error { + // connErr must always be non-nil unless there are no SubConns, in which + // case resolverErr must be non-nil. + if b.connErr == nil { + return fmt.Errorf("last resolver error: %v", b.resolverErr) + } + if b.resolverErr == nil { + return fmt.Errorf("last connection error: %v", b.connErr) + } + return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr) +} + +func (b *ringhashBalancer) regeneratePicker() { + if b.state == connectivity.TransientFailure { + b.picker = base.NewErrPicker(b.mergeErrors()) + return + } + b.picker = newPicker(b.ring) +} + +func (b *ringhashBalancer) Close() {} + +// connectivityStateEvaluator takes the connectivity states of multiple SubConns +// and returns one aggregated connectivity state. +// +// It's not thread safe. +type connectivityStateEvaluator struct { + nums [5]uint64 +} + +// recordTransition records state change happening in subConn and based on that +// it evaluates what aggregated state should be. +// +// - If there is at least one subchannel in READY state, report READY. +// - If there are 2 or more subchannels in TRANSIENT_FAILURE state, report TRANSIENT_FAILURE. +// - If there is at least one subchannel in CONNECTING state, report CONNECTING. +// - If there is at least one subchannel in IDLE state, report IDLE. +// - Otherwise, report TRANSIENT_FAILURE. +// +// Note that if there are 1 connecting, 2 transient failure, the overall state +// is transient failure. This is because the second transient failure is a +// fallback of the first failing SubConn, and we want to report transient +// failure to failover to the lower priority. +func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State { + // Update counters. + for idx, state := range []connectivity.State{oldState, newState} { + updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. + cse.nums[state] += updateVal + } + + if cse.nums[connectivity.Ready] > 0 { + return connectivity.Ready + } + if cse.nums[connectivity.TransientFailure] > 1 { + return connectivity.TransientFailure + } + if cse.nums[connectivity.Connecting] > 0 { + return connectivity.Connecting + } + if cse.nums[connectivity.Idle] > 0 { + return connectivity.Idle + } + return connectivity.TransientFailure +} diff --git a/xds/internal/balancer/ringhash/ringhash_test.go b/xds/internal/balancer/ringhash/ringhash_test.go index bf5da95bf8b..696824c2ab5 100644 --- a/xds/internal/balancer/ringhash/ringhash_test.go +++ b/xds/internal/balancer/ringhash/ringhash_test.go @@ -19,10 +19,18 @@ package ringhash import ( + "context" + "fmt" + "testing" "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/weightedroundrobin" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal/testutils" ) @@ -34,5 +42,447 @@ var ( ) const ( + defaultTestTimeout = 10 * time.Second defaultTestShortTimeout = 10 * time.Millisecond + + testBackendAddrsCount = 12 +) + +var ( + testBackendAddrStrs []string + testConfig = &LBConfig{MinRingSize: 1, MaxRingSize: 10} ) + +func init() { + for i := 0; i < testBackendAddrsCount; i++ { + testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)) + } +} + +func ctxWithHash(h uint64) context.Context { + return SetRequestHash(context.Background(), h) +} + +// setupTest creates the balancer, and does an initial sanity check. +// +// Ring generated from the addresses: +// - 0: &{idx:0 hash:2320126372778282342 sc:0xc000101540}, 0.0.0.0:0 +// - 1: &{idx:1 hash:8784655576673615208 sc:0xc0001015e0}, 2.2.2.2:2 +// - 2: &{idx:2 hash:11914555407170755292 sc:0xc000101590}, 1.1.1.1:1 +func setupTest(t *testing.T, addrs []resolver.Address) (*testutils.TestClientConn, balancer.Balancer, balancer.Picker) { + // t.Helper() + cc := testutils.NewTestClientConn(t) + builder := balancer.Get(Name) + b := builder.Build(cc, balancer.BuildOptions{}) + if b == nil { + t.Fatalf("builder.Build(%s) failed and returned nil", Name) + } + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: addrs}, + BalancerConfig: testConfig, + }); err != nil { + t.Fatalf("UpdateClientConnState returned err: %v", err) + } + + for _, addr := range addrs { + addr1 := <-cc.NewSubConnAddrsCh + if want := []resolver.Address{addr}; !cmp.Equal(addr1, want, cmp.AllowUnexported(attributes.Attributes{})) { + t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr1, want, cmp.AllowUnexported(attributes.Attributes{}))) + } + sc1 := <-cc.NewSubConnCh + // All the SubConns start in IDLE, and should not Connect(). + select { + case <-sc1.(*testutils.TestSubConn).ConnectCh: + t.Errorf("unexpected Connect() from SubConn %v", sc1) + case <-time.After(defaultTestShortTimeout): + } + } + + // Should also have a picker, with all SubConns in IDLE. + p1 := <-cc.NewPickerCh + return cc, b, p1 +} + +func TestOneSubConn(t *testing.T) { + wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0]} + cc, b, p0 := setupTest(t, []resolver.Address{wantAddr1}) + ring0 := p0.(*picker).ring + + firstHash := ring0.items[0].hash + // firstHash-1 will pick the first (and only) SubConn from the ring. + testHash := firstHash - 1 + // The first pick should be queued, and should trigger Connect() on the only + // SubConn. + if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable { + t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) + } + sc0 := ring0.items[0].sc.sc + select { + case <-sc0.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) + } + + // Send state updates to Ready. + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + // Test pick with one backend. + p1 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) + if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) + } + } +} + +/* + */ + +/* +done tests + +- 3 SubConns, affinity, one Ready. One down, pick another. One back Ready, will be picked again. +- 3 SubConns, affinity, different header, another backend, two Ready. + +- All SubConns start in IDLE, and do not connect +- Whenever a subchannel's state changes, a new picker will need to be updated +- If the picked SubConn is in IDLE, it will start to connect, and pick will +queue re-pick +*/ + +// TestThreeBackendsAffinity covers that there are 3 SubConns, RPCs with the +// same hash always pick the same SubConn. When the one picked is down, another +// one will be picked. +func TestThreeSubConnsAffinity(t *testing.T) { + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0]}, + {Addr: testBackendAddrStrs[1]}, + {Addr: testBackendAddrStrs[2]}, + } + cc, b, p0 := setupTest(t, wantAddrs) + // This test doesn't update addresses, so this ring will be used by all the + // pickers. + ring0 := p0.(*picker).ring + + firstHash := ring0.items[0].hash + // firstHash+1 will pick the second SubConn from the ring. + testHash := firstHash + 1 + // The first pick should be queued, and should trigger Connect() on the only + // SubConn. + if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable { + t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) + } + // The picked SubConn should be the second in the ring. + sc0 := ring0.items[1].sc.sc + select { + case <-sc0.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) + } + + // Send state updates to Ready. + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + p1 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) + if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) + } + } + + // Turn down the subConn in use. + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + p2 := <-cc.NewPickerCh + // Pick with the same hash should be queued, because the SubConn after the + // first picked is IDLE. + if _, err := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable { + t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) + } + + // The third SubConn in the ring should connect. + sc1 := ring0.items[2].sc.sc + select { + case <-sc1.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc1) + } + + // Send state updates to Ready. + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + // New picks should all return this SubConn. + p3 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, _ := p3.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) + if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) + } + } + + // Now, after backoff, the first picked SubConn will turn IDLE. + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + // The picks above should have queued Connect() for the first picked + // SubConn, so this IDLE state change will trigger a Connect(). + select { + case <-sc0.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) + } + + // After the first picked SubConn turn Ready, new picks should return it + // again (even though the second picked SubConn is also Ready). + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + p4 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, _ := p4.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) + if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) + } + } +} + +// TestThreeBackendsAffinity covers that there are 3 SubConns, RPCs with the +// same hash always pick the same SubConn. Then try different hash to pick +// another backend, and verify the first hash still picks the first backend. +func TestThreeSubConnsAffinityMultiple(t *testing.T) { + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0]}, + {Addr: testBackendAddrStrs[1]}, + {Addr: testBackendAddrStrs[2]}, + } + cc, b, p0 := setupTest(t, wantAddrs) + // This test doesn't update addresses, so this ring will be used by all the + // pickers. + ring0 := p0.(*picker).ring + + firstHash := ring0.items[0].hash + // firstHash+1 will pick the second SubConn from the ring. + testHash := firstHash + 1 + // The first pick should be queued, and should trigger Connect() on the only + // SubConn. + if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable { + t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) + } + sc0 := ring0.items[1].sc.sc + select { + case <-sc0.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) + } + + // Send state updates to Ready. + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + // First hash should always pick sc0. + p1 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) + if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) + } + } + + secondHash := ring0.items[1].hash + // secondHash+1 will pick the third SubConn from the ring. + testHash2 := secondHash + 1 + if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash2)}); err != balancer.ErrNoSubConnAvailable { + t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) + } + sc1 := ring0.items[2].sc.sc + select { + case <-sc1.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc1) + } + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + // With the new generated picker, hash2 always picks sc1. + p2 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, _ := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash2)}) + if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) + } + } + // But the first hash still picks sc0. + for i := 0; i < 5; i++ { + gotSCSt, _ := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) + if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) + } + } +} + +func TestAddrWeightChange(t *testing.T) { + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0]}, + {Addr: testBackendAddrStrs[1]}, + {Addr: testBackendAddrStrs[2]}, + } + cc, b, p0 := setupTest(t, wantAddrs) + ring0 := p0.(*picker).ring + + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: wantAddrs}, + BalancerConfig: nil, + }); err != nil { + t.Fatalf("UpdateClientConnState returned err: %v", err) + } + select { + case <-cc.NewPickerCh: + t.Fatalf("unexpected picker after UpdateClientConn with the same addresses") + case <-time.After(defaultTestShortTimeout): + } + + // Delete an address, should send a new Picker. + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + {Addr: testBackendAddrStrs[0]}, + {Addr: testBackendAddrStrs[1]}, + }}, + BalancerConfig: nil, + }); err != nil { + t.Fatalf("UpdateClientConnState returned err: %v", err) + } + var p1 balancer.Picker + select { + case p1 = <-cc.NewPickerCh: + case <-time.After(defaultTestTimeout): + t.Fatalf("timeout waiting for picker after UpdateClientConn with different addresses") + } + ring1 := p1.(*picker).ring + if ring1 == ring0 { + t.Fatalf("new picker after removing address has the same ring as before, want different") + } + + // Another update with the same addresses, but different weight. + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + {Addr: testBackendAddrStrs[0]}, + weightedroundrobin.SetAddrInfo( + resolver.Address{Addr: testBackendAddrStrs[1]}, + weightedroundrobin.AddrInfo{Weight: 2}), + }}, + BalancerConfig: nil, + }); err != nil { + t.Fatalf("UpdateClientConnState returned err: %v", err) + } + var p2 balancer.Picker + select { + case p2 = <-cc.NewPickerCh: + case <-time.After(defaultTestTimeout): + t.Fatalf("timeout waiting for picker after UpdateClientConn with different addresses") + } + if p2.(*picker).ring == ring1 { + t.Fatalf("new picker after changing address weight has the same ring as before, want different") + } +} + +// TestSubConnToConnectWhenOverallTransientFailure covers the situation when the +// overall state is TransientFailure, the SubConns turning IDLE will be +// triggered to Connect(). But not when the overall state is not +// TransientFailure. +func TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) { + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0]}, + {Addr: testBackendAddrStrs[1]}, + {Addr: testBackendAddrStrs[2]}, + } + _, b, p0 := setupTest(t, wantAddrs) + ring0 := p0.(*picker).ring + + // Turn all SubConns to TransientFailure. + for _, it := range ring0.items { + b.UpdateSubConnState(it.sc.sc, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + } + + // The next one turning IDLE should Connect(). + sc0 := ring0.items[0].sc.sc + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + select { + case <-sc0.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) + } + + // If this SubConn is ready. Other SubConns turning IDLE will not Connect(). + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + // The third SubConn in the ring should connect. + sc1 := ring0.items[1].sc.sc + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + select { + case <-sc1.(*testutils.TestSubConn).ConnectCh: + t.Errorf("unexpected Connect() from SubConn %v", sc1) + case <-time.After(defaultTestShortTimeout): + } +} + +/* +- Whenever the address list changes, a new ring and a new picker will be generated + - address change + - weights change + - no change, no update +- If the picker SubConn is in IDLE, but was after Ready (and after +TransientFailure), it will start to connect, and pick will also check the next +available SubConn (this is different the init IDLE, pick will queue re-pick). +*/ + +func TestConnectivityStateEvaluatorRecordTransition(t *testing.T) { + tests := []struct { + name string + from, to []connectivity.State + want connectivity.State + }{ + { + name: "one ready", + from: []connectivity.State{connectivity.Idle}, + to: []connectivity.State{connectivity.Ready}, + want: connectivity.Ready, + }, + { + name: "one connecting", + from: []connectivity.State{connectivity.Idle}, + to: []connectivity.State{connectivity.Connecting}, + want: connectivity.Connecting, + }, + { + name: "one ready one transient failure", + from: []connectivity.State{connectivity.Idle, connectivity.Idle}, + to: []connectivity.State{connectivity.Ready, connectivity.TransientFailure}, + want: connectivity.Ready, + }, + { + name: "one connecting one transient failure", + from: []connectivity.State{connectivity.Idle, connectivity.Idle}, + to: []connectivity.State{connectivity.Connecting, connectivity.TransientFailure}, + want: connectivity.Connecting, + }, + { + name: "one connecting two transient failure", + from: []connectivity.State{connectivity.Idle, connectivity.Idle, connectivity.Idle}, + to: []connectivity.State{connectivity.Connecting, connectivity.TransientFailure, connectivity.TransientFailure}, + want: connectivity.TransientFailure, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cse := &connectivityStateEvaluator{} + var got connectivity.State + for i, fff := range tt.from { + ttt := tt.to[i] + got = cse.recordTransition(fff, ttt) + } + if got != tt.want { + t.Errorf("recordTransition() = %v, want %v", got, tt.want) + } + }) + } +} From a72704269f51978888b0aedcc503fa97997451df Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 7 Sep 2021 12:59:57 -0700 Subject: [PATCH 02/10] [ring_hash_policy] delete logs --- xds/internal/balancer/ringhash/picker.go | 3 --- xds/internal/balancer/ringhash/ring.go | 11 ----------- xds/internal/balancer/ringhash/ring_test.go | 3 --- xds/internal/balancer/ringhash/ringhash.go | 2 -- 4 files changed, 19 deletions(-) diff --git a/xds/internal/balancer/ringhash/picker.go b/xds/internal/balancer/ringhash/picker.go index d47bc3490b7..af5f1f0e80c 100644 --- a/xds/internal/balancer/ringhash/picker.go +++ b/xds/internal/balancer/ringhash/picker.go @@ -19,8 +19,6 @@ package ringhash import ( - "log" - "fmt" "google.golang.org/grpc/balancer" @@ -81,7 +79,6 @@ func handleRICS(e *ringEntry) (handleRICSResult, bool) { func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { e := p.ring.pick(getRequestHash(info.Ctx)) - log.Printf(" * first item sc %p effinitive state: %v, state: %v", e.sc, e.sc.effectiveState(), e.sc.state) if hr, ok := handleRICS(e); ok { return hr.pr, hr.err } diff --git a/xds/internal/balancer/ringhash/ring.go b/xds/internal/balancer/ringhash/ring.go index ced456fbd2c..68e844cfb48 100644 --- a/xds/internal/balancer/ringhash/ring.go +++ b/xds/internal/balancer/ringhash/ring.go @@ -20,7 +20,6 @@ package ringhash import ( "fmt" - "log" "math" "sort" "strconv" @@ -67,13 +66,10 @@ type ringEntry struct { // and first item with hash >= given hash will be returned. func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize uint64) (*ring, error) { // https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114 - log.Printf(" - min size: %v, max size: %v", minRingSize, maxRingSize) - log.Printf(" - subConns: %v", subConns) normalizedWeights, minWeight, err := normalizeWeights(subConns) if err != nil { return nil, err } - log.Printf(" - normalized: %v, minWeight: %v", normalizedWeights, minWeight) // Normalized weights for {3,3,4} is {0.3,0.3,0.4}. // Scale up the size of the ring such that the least-weighted host gets a @@ -82,7 +78,6 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui // Note that size is limited by the input max/min. scale := math.Min(math.Ceil(minWeight*float64(minRingSize))/minWeight, float64(maxRingSize)) ringSize := math.Ceil(scale) - log.Printf(" - ringSize: %v", ringSize) items := make([]*ringEntry, 0, int(ringSize)) // For each entry, scale*weight nodes are generated in the ring. @@ -99,9 +94,7 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui ) for _, scw := range normalizedWeights { targetIdx += scale * scw.weight - log.Printf(" -- targetIdx: %v = prev + %v * %v", targetIdx, scale, scw.weight) for float64(idx) < targetIdx { - log.Printf(" --- idx %v, addr: %v, targetIdx %v", idx, scw.sc.addr, targetIdx) h := xxhash.Sum64String(scw.sc.addr + strconv.Itoa(len(items))) items = append(items, &ringEntry{idx: idx, hash: h, sc: scw.sc}) idx++ @@ -113,10 +106,6 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui for i, ii := range items { ii.idx = i } - log.Printf(" - ring: \n") - for i, ii := range items { - log.Printf(" -- %v: %+v, %v\n", i, ii, ii.sc.addr) - } return &ring{items: items}, nil } diff --git a/xds/internal/balancer/ringhash/ring_test.go b/xds/internal/balancer/ringhash/ring_test.go index 62b9abfbcf3..2d664e05bb2 100644 --- a/xds/internal/balancer/ringhash/ring_test.go +++ b/xds/internal/balancer/ringhash/ring_test.go @@ -20,7 +20,6 @@ package ringhash import ( "fmt" - "log" "math" "testing" @@ -47,7 +46,6 @@ func TestRingNew(t *testing.T) { for _, min := range []uint64{3, 4, 6, 8} { for _, max := range []uint64{20, 8} { t.Run(fmt.Sprintf("size-min-%v-max-%v", min, max), func(t *testing.T) { - log.Println(" ---- ") r, _ := newRing(testSubConnMap, min, max) totalCount := len(r.items) if totalCount < int(min) || totalCount > int(max) { @@ -66,7 +64,6 @@ func TestRingNew(t *testing.T) { t.Fatalf("unexpected item weight in ring: %v != %v", got, want) } } - log.Println(" ---- ") }) } } diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index b1801bfda6a..07702b3370d 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -23,7 +23,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "sync" "google.golang.org/grpc/balancer" @@ -107,7 +106,6 @@ type subConn struct { // It also handles the queued Connect(). If the new state is Idle, and a // Connect() was queued, this SubConn will be triggered to Connect(). func (sc *subConn) setState(s connectivity.State) { - log.Printf(" --- updating sc %p to state %v", sc, s) sc.mu.Lock() defer sc.mu.Unlock() // Any state change to non-Idle means there was an attempt to connect. From 4b3e8b02ec3c5cc6f8f9d632229e3e38722ac37c Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 7 Sep 2021 13:38:41 -0700 Subject: [PATCH 03/10] [ring_hash_policy] cleanup --- xds/internal/balancer/ringhash/ringhash.go | 1 + .../balancer/ringhash/ringhash_test.go | 32 +------------------ 2 files changed, 2 insertions(+), 31 deletions(-) diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index 07702b3370d..f64b447654e 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -193,6 +193,7 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { // // Addresses in this map all have attributes stripped, but metadata set to // the weight. So that weight change can be detected. + // // TODO: this won't be necessary if there are ways to compare address // attributes. addrsSet := make(map[resolver.Address]struct{}) diff --git a/xds/internal/balancer/ringhash/ringhash_test.go b/xds/internal/balancer/ringhash/ringhash_test.go index 696824c2ab5..ba92b55f7b5 100644 --- a/xds/internal/balancer/ringhash/ringhash_test.go +++ b/xds/internal/balancer/ringhash/ringhash_test.go @@ -64,13 +64,8 @@ func ctxWithHash(h uint64) context.Context { } // setupTest creates the balancer, and does an initial sanity check. -// -// Ring generated from the addresses: -// - 0: &{idx:0 hash:2320126372778282342 sc:0xc000101540}, 0.0.0.0:0 -// - 1: &{idx:1 hash:8784655576673615208 sc:0xc0001015e0}, 2.2.2.2:2 -// - 2: &{idx:2 hash:11914555407170755292 sc:0xc000101590}, 1.1.1.1:1 func setupTest(t *testing.T, addrs []resolver.Address) (*testutils.TestClientConn, balancer.Balancer, balancer.Picker) { - // t.Helper() + t.Helper() cc := testutils.NewTestClientConn(t) builder := balancer.Get(Name) b := builder.Build(cc, balancer.BuildOptions{}) @@ -137,21 +132,6 @@ func TestOneSubConn(t *testing.T) { } } -/* - */ - -/* -done tests - -- 3 SubConns, affinity, one Ready. One down, pick another. One back Ready, will be picked again. -- 3 SubConns, affinity, different header, another backend, two Ready. - -- All SubConns start in IDLE, and do not connect -- Whenever a subchannel's state changes, a new picker will need to be updated -- If the picked SubConn is in IDLE, it will start to connect, and pick will -queue re-pick -*/ - // TestThreeBackendsAffinity covers that there are 3 SubConns, RPCs with the // same hash always pick the same SubConn. When the one picked is down, another // one will be picked. @@ -425,16 +405,6 @@ func TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) { } } -/* -- Whenever the address list changes, a new ring and a new picker will be generated - - address change - - weights change - - no change, no update -- If the picker SubConn is in IDLE, but was after Ready (and after -TransientFailure), it will start to connect, and pick will also check the next -available SubConn (this is different the init IDLE, pick will queue re-pick). -*/ - func TestConnectivityStateEvaluatorRecordTransition(t *testing.T) { tests := []struct { name string From d68ccdbf0a8f8f7c8fc59b3daf517dd13528add0 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 7 Sep 2021 13:40:42 -0700 Subject: [PATCH 04/10] [ring_hash_policy] IDLE -> Idle --- xds/internal/balancer/ringhash/ringhash.go | 4 ++-- xds/internal/balancer/ringhash/ringhash_test.go | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index f64b447654e..38911cc72a9 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -185,7 +185,7 @@ type ringhashBalancer struct { // - an address's weight was updated // // Note that this function doesn't trigger SubConn connecting, so all the new -// SubConn states are IDLE. +// SubConn states are Idle. func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { var addrsUpdated bool // addrsSet is the set converted from addrs, it's used for quick lookup of @@ -397,7 +397,7 @@ type connectivityStateEvaluator struct { // - If there is at least one subchannel in READY state, report READY. // - If there are 2 or more subchannels in TRANSIENT_FAILURE state, report TRANSIENT_FAILURE. // - If there is at least one subchannel in CONNECTING state, report CONNECTING. -// - If there is at least one subchannel in IDLE state, report IDLE. +// - If there is at least one subchannel in Idle state, report Idle. // - Otherwise, report TRANSIENT_FAILURE. // // Note that if there are 1 connecting, 2 transient failure, the overall state diff --git a/xds/internal/balancer/ringhash/ringhash_test.go b/xds/internal/balancer/ringhash/ringhash_test.go index ba92b55f7b5..fb85367e4a4 100644 --- a/xds/internal/balancer/ringhash/ringhash_test.go +++ b/xds/internal/balancer/ringhash/ringhash_test.go @@ -85,7 +85,7 @@ func setupTest(t *testing.T, addrs []resolver.Address) (*testutils.TestClientCon t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr1, want, cmp.AllowUnexported(attributes.Attributes{}))) } sc1 := <-cc.NewSubConnCh - // All the SubConns start in IDLE, and should not Connect(). + // All the SubConns start in Idle, and should not Connect(). select { case <-sc1.(*testutils.TestSubConn).ConnectCh: t.Errorf("unexpected Connect() from SubConn %v", sc1) @@ -93,7 +93,7 @@ func setupTest(t *testing.T, addrs []resolver.Address) (*testutils.TestClientCon } } - // Should also have a picker, with all SubConns in IDLE. + // Should also have a picker, with all SubConns in Idle. p1 := <-cc.NewPickerCh return cc, b, p1 } @@ -177,7 +177,7 @@ func TestThreeSubConnsAffinity(t *testing.T) { b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) p2 := <-cc.NewPickerCh // Pick with the same hash should be queued, because the SubConn after the - // first picked is IDLE. + // first picked is Idle. if _, err := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable { t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) } @@ -202,10 +202,10 @@ func TestThreeSubConnsAffinity(t *testing.T) { } } - // Now, after backoff, the first picked SubConn will turn IDLE. + // Now, after backoff, the first picked SubConn will turn Idle. b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle}) // The picks above should have queued Connect() for the first picked - // SubConn, so this IDLE state change will trigger a Connect(). + // SubConn, so this Idle state change will trigger a Connect(). select { case <-sc0.(*testutils.TestSubConn).ConnectCh: case <-time.After(defaultTestTimeout): @@ -365,7 +365,7 @@ func TestAddrWeightChange(t *testing.T) { } // TestSubConnToConnectWhenOverallTransientFailure covers the situation when the -// overall state is TransientFailure, the SubConns turning IDLE will be +// overall state is TransientFailure, the SubConns turning Idle will be // triggered to Connect(). But not when the overall state is not // TransientFailure. func TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) { @@ -382,7 +382,7 @@ func TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) { b.UpdateSubConnState(it.sc.sc, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) } - // The next one turning IDLE should Connect(). + // The next one turning Idle should Connect(). sc0 := ring0.items[0].sc.sc b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle}) select { @@ -391,7 +391,7 @@ func TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) { t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) } - // If this SubConn is ready. Other SubConns turning IDLE will not Connect(). + // If this SubConn is ready. Other SubConns turning Idle will not Connect(). b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) From fcca6dc43b7308143a2c64710f600571900374e8 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 7 Sep 2021 13:44:16 -0700 Subject: [PATCH 05/10] [ring_hash_policy] picker error log --- xds/internal/balancer/ringhash/picker.go | 17 +++++++++-------- xds/internal/balancer/ringhash/ringhash.go | 2 +- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/xds/internal/balancer/ringhash/picker.go b/xds/internal/balancer/ringhash/picker.go index af5f1f0e80c..a049a99db02 100644 --- a/xds/internal/balancer/ringhash/picker.go +++ b/xds/internal/balancer/ringhash/picker.go @@ -24,15 +24,17 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/status" ) type picker struct { - ring *ring + ring *ring + logger *grpclog.PrefixLogger } -func newPicker(ring *ring) *picker { - return &picker{ring: ring} +func newPicker(ring *ring, logger *grpclog.PrefixLogger) *picker { + return &picker{ring: ring, logger: logger} } // handleRICSResult is the return type of handleRICS. It's needed to wrap the @@ -51,7 +53,7 @@ type handleRICSResult struct { // The first return value indicates if the state is in Ready, Idle, Connecting // or Shutdown. If it's true, the PickResult and error should be returned from // Pick() as is. -func handleRICS(e *ringEntry) (handleRICSResult, bool) { +func handleRICS(e *ringEntry, logger *grpclog.PrefixLogger) (handleRICSResult, bool) { switch state := e.sc.effectiveState(); state { case connectivity.Ready: return handleRICSResult{pr: balancer.PickResult{SubConn: e.sc.sc}}, true @@ -71,15 +73,14 @@ func handleRICS(e *ringEntry) (handleRICSResult, bool) { default: // Should never reach this. All the connectivity states are already // handled in the cases. - // - // FIXME: add an error log. + logger.Errorf("SubConn has undefined connectivity state: %v", state) return handleRICSResult{err: status.Errorf(codes.Unavailable, "SubConn has undefined connectivity state: %v", state)}, true } } func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { e := p.ring.pick(getRequestHash(info.Ctx)) - if hr, ok := handleRICS(e); ok { + if hr, ok := handleRICS(e, p.logger); ok { return hr.pr, hr.err } // ok was false, the entry is in transient failure. @@ -99,7 +100,7 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro // For the second SubConn, also check Ready/Idle/Connecting as if it's the // first entry. - if hr, ok := handleRICS(e2); ok { + if hr, ok := handleRICS(e2, p.logger); ok { return hr.pr, hr.err } diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index 38911cc72a9..a561e273d0e 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -378,7 +378,7 @@ func (b *ringhashBalancer) regeneratePicker() { b.picker = base.NewErrPicker(b.mergeErrors()) return } - b.picker = newPicker(b.ring) + b.picker = newPicker(b.ring, b.logger) } func (b *ringhashBalancer) Close() {} From 85ddb64e9b703550bf767e9b8b0c55dd04612913 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 14 Sep 2021 09:53:49 -0700 Subject: [PATCH 06/10] subconn init IDLE --- xds/internal/balancer/ringhash/ringhash.go | 1 + 1 file changed, 1 insertion(+) diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index a561e273d0e..5f714bf6cab 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -219,6 +219,7 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { } scs := &subConn{addr: a.Addr, sc: sc} scs.setState(connectivity.Idle) + b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle) b.subConns[aNoAttrs] = scs b.scStates[sc] = scs addrsUpdated = true From 1ae76b146c61065cff40fe4f74523c76d0a984ff Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 14 Sep 2021 14:16:18 -0700 Subject: [PATCH 07/10] [ring_hash_policy] c1 --- xds/internal/balancer/ringhash/picker.go | 8 ++++---- xds/internal/balancer/ringhash/ringhash.go | 14 ++++++++------ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/xds/internal/balancer/ringhash/picker.go b/xds/internal/balancer/ringhash/picker.go index a049a99db02..dcea6d46e51 100644 --- a/xds/internal/balancer/ringhash/picker.go +++ b/xds/internal/balancer/ringhash/picker.go @@ -53,7 +53,7 @@ type handleRICSResult struct { // The first return value indicates if the state is in Ready, Idle, Connecting // or Shutdown. If it's true, the PickResult and error should be returned from // Pick() as is. -func handleRICS(e *ringEntry, logger *grpclog.PrefixLogger) (handleRICSResult, bool) { +func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) { switch state := e.sc.effectiveState(); state { case connectivity.Ready: return handleRICSResult{pr: balancer.PickResult{SubConn: e.sc.sc}}, true @@ -73,14 +73,14 @@ func handleRICS(e *ringEntry, logger *grpclog.PrefixLogger) (handleRICSResult, b default: // Should never reach this. All the connectivity states are already // handled in the cases. - logger.Errorf("SubConn has undefined connectivity state: %v", state) + p.logger.Errorf("SubConn has undefined connectivity state: %v", state) return handleRICSResult{err: status.Errorf(codes.Unavailable, "SubConn has undefined connectivity state: %v", state)}, true } } func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { e := p.ring.pick(getRequestHash(info.Ctx)) - if hr, ok := handleRICS(e, p.logger); ok { + if hr, ok := p.handleRICS(e); ok { return hr.pr, hr.err } // ok was false, the entry is in transient failure. @@ -100,7 +100,7 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro // For the second SubConn, also check Ready/Idle/Connecting as if it's the // first entry. - if hr, ok := handleRICS(e2, p.logger); ok { + if hr, ok := p.handleRICS(e2); ok { return hr.pr, hr.err } diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index 5f714bf6cab..79395966a8b 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -180,8 +180,8 @@ type ringhashBalancer struct { // // The return value is whether the new address list is different from the // previous. True if -// - an addresses was added -// - an addresses was removed +// - an address was added +// - an address was removed // - an address's weight was updated // // Note that this function doesn't trigger SubConn connecting, so all the new @@ -336,10 +336,12 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance if b.state == connectivity.TransientFailure { scs.queueConnect() } - // Resend the picker, there's no need to regenerate the picker because - // the ring didn't change. - sendUpdate = true - case connectivity.Connecting, connectivity.Ready: + // No need to send an update. No queued RPC can be unblocked. If the + // overall state changed because of this, sendUpdate is already true. + case connectivity.Connecting: + // No need to send an update. No queued RPC can be unblocked. If the + // overall state changed because of this, sendUpdate is already true. + case connectivity.Ready: // Resend the picker, there's no need to regenerate the picker because // the ring didn't change. sendUpdate = true From 6b1eace35ef478dd130ce7d7a9feb5e4d73a3440 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 14 Sep 2021 16:07:44 -0700 Subject: [PATCH 08/10] [ring_hash_policy] fix effective state --- xds/internal/balancer/ringhash/ringhash.go | 46 ++++++++++++---------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index 79395966a8b..9f7662ef680 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -72,26 +72,25 @@ type subConn struct { // This is the actual state of this SubConn (as updated by the ClientConn). // The effective state can be different, see comment of attemptedToConnect. state connectivity.State - // attemptedToConnect is whether this SubConn has attempted to connect ever. - // So that only the initial Idle is Idle, after any attempt to connect, - // following Idles are all TransientFailure. + // failing is whether this SubConn is in a failing state. A subConn is + // considered to be in a failing state if it was previously in + // TransientFailure. // - // This affects the effective connectivity state of this SubConn, e.g. if - // the actual state is Idle, but this SubConn has attempted to connect, the - // effective state is TransientFailure. + // This affects the effective connectivity state of this SubConn, e.g. + // - if the actual state is Idle or Connecting, but this SubConn is failing, + // the effective state is TransientFailure. // - // This is used in pick(). E.g. if a subConn is Idle, but has - // attemptedToConnect as true, pick() will + // This is used in pick(). E.g. if a subConn is Idle, but has failing as + // true, pick() will // - consider this SubConn as TransientFailure, and check the state of the // next SubConn. // - trigger Connect() (note that normally a SubConn in real // TransientFailure cannot Connect()) // - // Note this should only be set when updating the state (from Idle to - // anything else), not when Connect() is called, because there's a small - // window after the first Connect(), before the state switches to something - // else. - attemptedToConnect bool + // A subConn starts in non-failing (failing is false). A transition to + // TransientFailure set failing to true (and it stay true). A transition to + // Ready sets failing to false. + failing bool // connectQueued is true if a Connect() was queued for this SubConn while // it's not in Idle (most likely was in TransientFailure). A Connect() will // be triggered on this SubConn when it turns Idle. @@ -108,10 +107,6 @@ type subConn struct { func (sc *subConn) setState(s connectivity.State) { sc.mu.Lock() defer sc.mu.Unlock() - // Any state change to non-Idle means there was an attempt to connect. - if s != connectivity.Idle { - sc.attemptedToConnect = true - } switch s { case connectivity.Idle: // Trigger Connect() if new state is Idle, and there is a queued connect. @@ -119,21 +114,30 @@ func (sc *subConn) setState(s connectivity.State) { sc.connectQueued = false sc.sc.Connect() } - case connectivity.Connecting, connectivity.Ready: + case connectivity.Connecting: // Clear connectQueued if the SubConn isn't failing. This state // transition is unlikely to happen, but handle this just in case. sc.connectQueued = false + case connectivity.Ready: + // Clear connectQueued if the SubConn isn't failing. This state + // transition is unlikely to happen, but handle this just in case. + sc.connectQueued = false + // Set to a non-failing state. + sc.failing = false + case connectivity.TransientFailure: + // Set to a failing state. + sc.failing = true } sc.state = s } // effectiveState returns the effective state of this SubConn. It can be -// different from the actual state, e.g. Idle after any attempt to connect (any -// Idle other than the initial Idle) is considered TransientFailure. +// different from the actual state, e.g. Idle while the subConn is failing is +// considered TransientFailure. Read comment of field failing for other cases. func (sc *subConn) effectiveState() connectivity.State { sc.mu.RLock() defer sc.mu.RUnlock() - if sc.state == connectivity.Idle && sc.attemptedToConnect { + if sc.failing && (sc.state == connectivity.Idle || sc.state == connectivity.Connecting) { return connectivity.TransientFailure } return sc.state From 9fe1f30896b5ca8ef43a41a85a3d08bec75cf83b Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 14 Sep 2021 15:38:03 -0700 Subject: [PATCH 09/10] [ring_hash_policy] [ring_hash_policy_working] new test --- xds/internal/test/xds_client_affinity_test.go | 136 ++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 xds/internal/test/xds_client_affinity_test.go diff --git a/xds/internal/test/xds_client_affinity_test.go b/xds/internal/test/xds_client_affinity_test.go new file mode 100644 index 00000000000..e9ddfe157b1 --- /dev/null +++ b/xds/internal/test/xds_client_affinity_test.go @@ -0,0 +1,136 @@ +//go:build !386 +// +build !386 + +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds_test + +import ( + "context" + "fmt" + "testing" + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/xds/env" + testpb "google.golang.org/grpc/test/grpc_testing" + "google.golang.org/grpc/xds/internal/testutils/e2e" +) + +const hashHeaderName = "session_id" + +// hashRouteConfig returns a RouteConfig resource with hash policy set to +// header "session_id". +func hashRouteConfig(routeName, ldsTarget, clusterName string) *v3routepb.RouteConfiguration { + return &v3routepb.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{ldsTarget}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName}, + HashPolicy: []*v3routepb.RouteAction_HashPolicy{{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ + Header: &v3routepb.RouteAction_HashPolicy_Header{ + HeaderName: hashHeaderName, + }, + }, + Terminal: true, + }}, + }}, + }}, + }}, + } +} + +// ringhashCluster returns a Cluster resource that picks ringhash as the lb +// policy. +func ringhashCluster(clusterName, edsServiceName string) *v3clusterpb.Cluster { + return &v3clusterpb.Cluster{ + Name: clusterName, + ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS}, + EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{ + EdsConfig: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{ + Ads: &v3corepb.AggregatedConfigSource{}, + }, + }, + ServiceName: edsServiceName, + }, + LbPolicy: v3clusterpb.Cluster_RING_HASH, + } +} + +// TestClientSideAffinitySanityCheck tests that the affinity config can be +// propagated to pick the ring_hash policy. It doesn't test the affinity +// behavior in ring_hash policy. +func (s) TestClientSideAffinitySanityCheck(t *testing.T) { + defer func() func() { + old := env.RingHashSupport + env.RingHashSupport = true + return func() { env.RingHashSupport = old } + }()() + + managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t) + defer cleanup1() + + port, cleanup2 := clientSetup(t, &testService{}) + defer cleanup2() + + const serviceName = "my-service-client-side-xds" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: port, + SecLevel: e2e.SecurityLevelNone, + }) + // Replace RDS and CDS resources with ringhash config, but keep the resource + // names. + resources.Routes = []*v3routepb.RouteConfiguration{hashRouteConfig( + resources.Routes[0].Name, + resources.Listeners[0].Name, + resources.Clusters[0].Name, + )} + resources.Clusters = []*v3clusterpb.Cluster{ringhashCluster( + resources.Clusters[0].Name, + resources.Clusters[0].EdsClusterConfig.ServiceName, + )} + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testpb.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } +} From 5beb75ecf8c520257af322543d41fa41b77968b0 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 15 Sep 2021 13:23:12 -0700 Subject: [PATCH 10/10] [ring_hash_policy] +s --- xds/internal/balancer/ringhash/ringhash.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index 9f7662ef680..f8a47f165bd 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -88,8 +88,8 @@ type subConn struct { // TransientFailure cannot Connect()) // // A subConn starts in non-failing (failing is false). A transition to - // TransientFailure set failing to true (and it stay true). A transition to - // Ready sets failing to false. + // TransientFailure sets failing to true (and it stays true). A transition + // to Ready sets failing to false. failing bool // connectQueued is true if a Connect() was queued for this SubConn while // it's not in Idle (most likely was in TransientFailure). A Connect() will