Skip to content

Commit

Permalink
Responded to Menghan's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Mar 22, 2022
1 parent 6f346a3 commit 48f844b
Showing 1 changed file with 77 additions and 50 deletions.
127 changes: 77 additions & 50 deletions internal/balancergroup/balancergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package balancergroup

import (
"context"
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
Expand All @@ -34,6 +36,11 @@ import (
"google.golang.org/grpc/resolver"
)

const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
)

var (
rrBuilder = balancer.Get(roundrobin.Name)
testBalancerIDs = []string{"b1", "b2", "b3"}
Expand Down Expand Up @@ -553,15 +560,16 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) {
bg.Start()

m1 := make(map[resolver.Address]balancer.SubConn)
scs := make(map[balancer.SubConn]bool)
for i := 0; i < 2; i++ {
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m1[addrs[0]] = sc
scs[sc] = true
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}

// Should behave like a normal rr picker
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{
m1[testBackendAddrs[0]], m1[testBackendAddrs[1]],
Expand All @@ -573,79 +581,98 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) {
// The balancer type for testBalancersIDs[0] is currently Round Robin. Now,
// change it to a balancer that has separate behavior logically (creating
// SubConn for second address in address list and always picking that
// SubConn), and see if the aggregated picker reflects that change.
bg.UpdateBuilder(testBalancerIDs[0], secondAddressBalancerBuilder{})
// SubConn), and see if the downstream behavior reflects that change.
bg.UpdateBuilder(testBalancerIDs[0], wrappedPickFirstBalancerBuilder{})
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil {
t.Fatalf("error updating ClientConn state: %v", err)
}

addrs := <-cc.NewSubConnAddrsCh
if addrs[0].Addr != testBackendAddrs[3].Addr {
// Verifies forwarded to new created balancer, as the wrapped pick first
// balancer will delete first address.
t.Fatalf("newSubConn called with wrong address, want: %v, got : %v", testBackendAddrs[3].Addr, addrs[0].Addr)
}
sc := <-cc.NewSubConnCh

// Pending being ready should forward the new picker (wrapped in a weighted
// aggregator) to the ClientConn which should only return SubConn
// corresponding to address 3.
// Update the pick first balancers SubConn as CONNECTING. This will cause
// the pick first balancer to UpdateState() with CONNECTING, which shouldn't send
// a Picker update back, as the Graceful Switch process is not complete.
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
select {
case <-cc.NewPickerCh:
t.Fatalf("No new picker should have been sent due to the Graceful Switch process not completing")
case <-ctx.Done():
}

// Update the pick first balancers SubConn as READY. This will cause
// the pick first balancer to UpdateState() with READY, which should send a
// Picker update back, as the Graceful Switch process is complete. This
// Picker should always pick the pick first's created SubConn which
// corresponds to address 3.
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{
sc,
pr, err := p2.Pick(balancer.PickInfo{})
if err != nil {
t.Fatalf("error picking: %v", err)
}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
if pr.SubConn != sc {
t.Fatalf("picker.Pick(), want %v, got %v", sc, pr.SubConn)
}
}

type secondAddressBalancerBuilder struct{}

func (secondAddressBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &secondAddressBalancer{
cc: cc,
// The Graceful Switch process completing for the child should cause the
// SubConns for the balancer being gracefully switched from to get deleted.
ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i := 0; i < 2; i++ {
select {
case <-ctx.Done():
t.Fatalf("error waiting for RemoveSubConn()")
case sc := <-cc.RemoveSubConnCh:
// The SubConn removed should have been one of the two created
// SubConns, and both should be deleted.
if ok := scs[sc]; ok {
delete(scs, sc)
continue
} else {
t.Fatalf("RemoveSubConn called for wrong SubConn %v, want in %v", sc, scs)
}
}
}
}

func (secondAddressBalancerBuilder) Name() string {
return "secondAddressBalancer"
}
type wrappedPickFirstBalancerBuilder struct{}

type secondAddressBalancer struct {
cc balancer.ClientConn
p balancer.Picker
}

func (mb *secondAddressBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
// Hardcode this balancer to only deal with the second address to verify
// that it's this balancer type in the balancer group.
sc, err := mb.cc.NewSubConn(ccs.ResolverState.Addresses[1:2], balancer.NewSubConnOptions{})
if err != nil {
return err
func (wrappedPickFirstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
builder := balancer.Get(grpc.PickFirstBalancerName)
wpfb := &wrappedPickFirstBalancer{
ClientConn: cc,
}

mb.p = &secondAddressBalancerPicker{
sc: sc,
}

// READY will move this balancer into current, causing it to Update Client
// Conn with new Picker for this specific type of balancer, not round robin.
mb.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: mb.p,
})
return nil
pf := builder.Build(wpfb, opts)
wpfb.Balancer = pf
return wpfb
}

func (mb *secondAddressBalancer) ResolverError(err error) {}

func (mb *secondAddressBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
func (wrappedPickFirstBalancerBuilder) Name() string {
return "wrappedPickFirstBalancer"
}

func (mb *secondAddressBalancer) Close() {}
type wrappedPickFirstBalancer struct {
balancer.Balancer
balancer.ClientConn
}

type secondAddressBalancerPicker struct {
sc balancer.SubConn
func (wb *wrappedPickFirstBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
s.ResolverState.Addresses = s.ResolverState.Addresses[1:]
return wb.Balancer.UpdateClientConnState(s)
}

func (p *secondAddressBalancerPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
return balancer.PickResult{SubConn: p.sc}, nil
func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) {
// Eat it if IDLE - allows it to switch over only on a READY SubConn.
if state.ConnectivityState == connectivity.Idle {
return
}
wb.ClientConn.UpdateState(state)
}

0 comments on commit 48f844b

Please sign in to comment.