Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster manager: Add Graceful Switch functionality to Cluster Manager #5265

Merged
merged 1 commit into from Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions xds/internal/balancer/clustermanager/clustermanager.go
Expand Up @@ -93,6 +93,11 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) {
b.stateAggregator.add(name)
// Then add to the balancer group.
b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
} else {
// Already present, check for type change and if so send down a new builder.
if newT.ChildPolicy.Name != b.children[name].ChildPolicy.Name {
b.bg.UpdateBuilder(name, balancer.Get(newT.ChildPolicy.Name))
}
}
// TODO: handle error? How to aggregate errors and return?
_ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{
Expand Down
131 changes: 131 additions & 0 deletions xds/internal/balancer/clustermanager/clustermanager_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
Expand All @@ -48,6 +49,11 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
)

var (
rtBuilder balancer.Builder
rtParser balancer.ConfigParser
Expand Down Expand Up @@ -102,6 +108,7 @@ func init() {
rtParser = rtBuilder.(balancer.ConfigParser)

balancer.Register(&ignoreAttrsRRBuilder{balancer.Get(roundrobin.Name)})
balancer.Register(wrappedPickFirstBalancerBuilder{})

balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
}
Expand Down Expand Up @@ -632,3 +639,127 @@ func TestInitialIdle(t *testing.T) {
t.Fatalf("Received aggregated state: %v, want Idle", state1)
}
}

// TestClusterGracefulSwitch tests the graceful switch functionality for a child
// of the cluster manager. At first, the child is configured as a round robin
// load balancer, and thus should behave accordingly. The test then gracefully
// switches this child to a pick first load balancer. Once that balancer updates
// it's state and completes the graceful switch process the new picker should
// reflect this change.
func TestClusterGracefulSwitch(t *testing.T) {
cc := testutils.NewTestClientConn(t)
rtb := rtBuilder.Build(cc, balancer.BuildOptions{})

configJSON1 := `{
"children": {
"csp:cluster":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }
}
}`
config1, err := rtParser.ParseConfig([]byte(configJSON1))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
{Addr: testBackendAddrStrs[1], BalancerAttributes: nil},
}
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"csp:cluster"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}

sc1 := <-cc.NewSubConnCh
rtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p1 := <-cc.NewPickerCh
pi := balancer.PickInfo{
Ctx: SetPickedCluster(context.Background(), "csp:cluster"),
}
testPick(t, p1, pi, sc1, nil)

// Same cluster, different balancer type.
configJSON2 := `{
"children": {
"csp:cluster":{ "childPolicy": [{"wrappedPickFirstBalancer":""}] }
}
}`
config2, err := rtParser.ParseConfig([]byte(configJSON2))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[1], []string{"csp:cluster"}),
}},
BalancerConfig: config2,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
sc2 := <-cc.NewSubConnCh
// Update the pick first balancers SubConn as CONNECTING. This will cause
// the pick first balancer to UpdateState() with CONNECTING, which shouldn't send
// a Picker update back, as the Graceful Switch process is not complete.
rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer cancel()
select {
case <-cc.NewPickerCh:
t.Fatalf("No new picker should have been sent due to the Graceful Switch process not completing")
case <-ctx.Done():
}

// Update the pick first balancers SubConn as READY. This will cause
// the pick first balancer to UpdateState() with READY, which should send a
// Picker update back, as the Graceful Switch process is complete. This
// Picker should always pick the pick first's created SubConn.
rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p2 := <-cc.NewPickerCh
testPick(t, p2, pi, sc2, nil)
// The Graceful Switch process completing for the child should cause the
// SubConns for the balancer being gracefully switched from to get deleted.
ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
select {
case <-ctx.Done():
t.Fatalf("error waiting for RemoveSubConn()")
case rsc := <-cc.RemoveSubConnCh:
// The SubConn removed should have been the created SubConn
// from the child before switching.
if rsc != sc1 {
t.Fatalf("RemoveSubConn() got: %v, want %v", rsc, sc1)
}
}
}

type wrappedPickFirstBalancerBuilder struct{}

func (wrappedPickFirstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
builder := balancer.Get(grpc.PickFirstBalancerName)
wpfb := &wrappedPickFirstBalancer{
ClientConn: cc,
}
pf := builder.Build(wpfb, opts)
wpfb.Balancer = pf
return wpfb
}

func (wrappedPickFirstBalancerBuilder) Name() string {
return "wrappedPickFirstBalancer"
}

type wrappedPickFirstBalancer struct {
balancer.Balancer
balancer.ClientConn
}

func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) {
// Eat it if IDLE - allows it to switch over only on a READY SubConn.
if state.ConnectivityState == connectivity.Idle {
return
}
wb.ClientConn.UpdateState(state)
}