From 42cadc171d4f126ee02e3a395f3e8c04affbbf04 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 30 Mar 2022 12:58:41 -0700 Subject: [PATCH] test: cleanup balancer switching tests (#5271) --- balancer_switching_test.go | 569 --------------------- call_test.go | 211 -------- clientconn.go | 10 +- clientconn_state_transition_test.go | 5 +- clientconn_test.go | 9 + internal/testutils/fakegrpclb/server.go | 249 ++++++++++ interop/fake_grpclb/fake_grpclb.go | 127 +---- reflection/serverreflection_test.go | 2 +- server_test.go | 8 + test/balancer_switching_test.go | 636 ++++++++++++++++++++++++ test/pickfirst_test.go | 16 +- test/resolver_update_test.go | 2 +- test/roundrobin_test.go | 8 +- 13 files changed, 944 insertions(+), 908 deletions(-) delete mode 100644 balancer_switching_test.go delete mode 100644 call_test.go create mode 100644 internal/testutils/fakegrpclb/server.go create mode 100644 test/balancer_switching_test.go diff --git a/balancer_switching_test.go b/balancer_switching_test.go deleted file mode 100644 index 3812bbdc990..00000000000 --- a/balancer_switching_test.go +++ /dev/null @@ -1,569 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package grpc - -import ( - "context" - "fmt" - "math" - "testing" - "time" - - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/internal" - "google.golang.org/grpc/internal/balancer/stub" - "google.golang.org/grpc/resolver" - "google.golang.org/grpc/resolver/manual" - "google.golang.org/grpc/serviceconfig" - "google.golang.org/grpc/status" -) - -var _ balancer.Builder = &magicalLB{} -var _ balancer.Balancer = &magicalLB{} - -// magicalLB is a ringer for grpclb. It is used to avoid circular dependencies on the grpclb package -type magicalLB struct{} - -func (b *magicalLB) Name() string { - return "grpclb" -} - -func (b *magicalLB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - return b -} - -func (b *magicalLB) ResolverError(error) {} - -func (b *magicalLB) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {} - -func (b *magicalLB) UpdateClientConnState(balancer.ClientConnState) error { - return nil -} - -func (b *magicalLB) Close() {} - -func (b *magicalLB) ExitIdle() {} - -func init() { - balancer.Register(&magicalLB{}) -} - -func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, func()) { - var servers []*server - for i := 0; i < numServers; i++ { - s := newTestServer() - servers = append(servers, s) - go s.start(t, 0, maxStreams) - s.wait(t, 2*time.Second) - } - return servers, func() { - for i := 0; i < numServers; i++ { - servers[i].stop() - } - } -} - -func errorDesc(err error) string { - if s, ok := status.FromError(err); ok { - return s.Message() - } - return err.Error() -} - -func checkPickFirst(cc *ClientConn, servers []*server) error { - var ( - req = "port" - reply string - err error - ) - connected := false - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - for i := 0; i < 5000; i++ { - if err = cc.Invoke(ctx, "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port { - if connected { - // connected is set to false if peer is not server[0]. So if - // connected is true here, this is the second time we saw - // server[0] in a row. Break because pickfirst is in effect. - break - } - connected = true - } else { - connected = false - } - time.Sleep(time.Millisecond) - } - if !connected { - return fmt.Errorf("pickfirst is not in effect after 5 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port) - } - - // The following RPCs should all succeed with the first server. - for i := 0; i < 3; i++ { - err = cc.Invoke(ctx, "/foo/bar", &req, &reply) - if errorDesc(err) != servers[0].port { - return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[0].port, err) - } - } - return nil -} - -func checkRoundRobin(cc *ClientConn, servers []*server) error { - var ( - req = "port" - reply string - err error - ) - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - // Make sure connections to all servers are up. - for i := 0; i < 2; i++ { - // Do this check twice, otherwise the first RPC's transport may still be - // picked by the closing pickfirst balancer, and the test becomes flaky. - for _, s := range servers { - var up bool - for i := 0; i < 5000; i++ { - if err = cc.Invoke(ctx, "/foo/bar", &req, &reply); errorDesc(err) == s.port { - up = true - break - } - time.Sleep(time.Millisecond) - } - if !up { - return fmt.Errorf("server %v is not up within 5 second", s.port) - } - } - } - - serverCount := len(servers) - for i := 0; i < 3*serverCount; i++ { - err = cc.Invoke(ctx, "/foo/bar", &req, &reply) - if errorDesc(err) != servers[i%serverCount].port { - return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err) - } - } - return nil -} - -func (s) TestSwitchBalancer(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - - const numServers = 2 - servers, scleanup := startServers(t, numServers, math.MaxInt32) - defer scleanup() - - cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{})) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}} - r.UpdateState(resolver.State{Addresses: addrs}) - // The default balancer is pickfirst. - if err := checkPickFirst(cc, servers); err != nil { - t.Fatalf("check pickfirst returned non-nil error: %v", err) - } - // Switch to roundrobin. - cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil) - if err := checkRoundRobin(cc, servers); err != nil { - t.Fatalf("check roundrobin returned non-nil error: %v", err) - } - // Switch to pickfirst. - cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil) - if err := checkPickFirst(cc, servers); err != nil { - t.Fatalf("check pickfirst returned non-nil error: %v", err) - } -} - -// First addr update contains grpclb. -func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - - cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{})) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - - // ClientConn will switch balancer to grpclb when receives an address of - // type GRPCLB. - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}}) - var isGRPCLB bool - for i := 0; i < 5000; i++ { - cc.mu.Lock() - isGRPCLB = cc.curBalancerName == "grpclb" - cc.mu.Unlock() - if isGRPCLB { - break - } - time.Sleep(time.Millisecond) - } - if !isGRPCLB { - t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName) - } - - // New update containing new backend and new grpclb. Should not switch - // balancer. - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}}) - for i := 0; i < 200; i++ { - cc.mu.Lock() - isGRPCLB = cc.curBalancerName == "grpclb" - cc.mu.Unlock() - if !isGRPCLB { - break - } - time.Sleep(time.Millisecond) - } - if !isGRPCLB { - t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb") - } - - var isPickFirst bool - // Switch balancer to pickfirst. - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}}) - for i := 0; i < 5000; i++ { - cc.mu.Lock() - isPickFirst = cc.curBalancerName == PickFirstBalancerName - cc.mu.Unlock() - if isPickFirst { - break - } - time.Sleep(time.Millisecond) - } - if !isPickFirst { - t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName) - } -} - -// First addr update does not contain grpclb. -func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - - cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{})) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}}) - var isPickFirst bool - for i := 0; i < 5000; i++ { - cc.mu.Lock() - isPickFirst = cc.curBalancerName == PickFirstBalancerName - cc.mu.Unlock() - if isPickFirst { - break - } - time.Sleep(time.Millisecond) - } - if !isPickFirst { - t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName) - } - - // ClientConn will switch balancer to grpclb when receives an address of - // type GRPCLB. - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}}) - var isGRPCLB bool - for i := 0; i < 5000; i++ { - cc.mu.Lock() - isGRPCLB = cc.curBalancerName == "grpclb" - cc.mu.Unlock() - if isGRPCLB { - break - } - time.Sleep(time.Millisecond) - } - if !isGRPCLB { - t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName) - } - - // New update containing new backend and new grpclb. Should not switch - // balancer. - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}}) - for i := 0; i < 200; i++ { - cc.mu.Lock() - isGRPCLB = cc.curBalancerName == "grpclb" - cc.mu.Unlock() - if !isGRPCLB { - break - } - time.Sleep(time.Millisecond) - } - if !isGRPCLB { - t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb") - } - - // Switch balancer back. - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}}) - for i := 0; i < 5000; i++ { - cc.mu.Lock() - isPickFirst = cc.curBalancerName == PickFirstBalancerName - cc.mu.Unlock() - if isPickFirst { - break - } - time.Sleep(time.Millisecond) - } - if !isPickFirst { - t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName) - } -} - -// Test that if the current balancer is roundrobin, after switching to grpclb, -// when the resolved address doesn't contain grpclb addresses, balancer will be -// switched back to roundrobin. -func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - - cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{})) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - - sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`) - - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc}) - var isRoundRobin bool - for i := 0; i < 5000; i++ { - cc.mu.Lock() - isRoundRobin = cc.curBalancerName == "round_robin" - cc.mu.Unlock() - if isRoundRobin { - break - } - time.Sleep(time.Millisecond) - } - if !isRoundRobin { - t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName) - } - - // ClientConn will switch balancer to grpclb when receives an address of - // type GRPCLB. - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}, ServiceConfig: sc}) - var isGRPCLB bool - for i := 0; i < 5000; i++ { - cc.mu.Lock() - isGRPCLB = cc.curBalancerName == "grpclb" - cc.mu.Unlock() - if isGRPCLB { - break - } - time.Sleep(time.Millisecond) - } - if !isGRPCLB { - t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName) - } - - // Switch balancer back. - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc}) - for i := 0; i < 5000; i++ { - cc.mu.Lock() - isRoundRobin = cc.curBalancerName == "round_robin" - cc.mu.Unlock() - if isRoundRobin { - break - } - time.Sleep(time.Millisecond) - } - if !isRoundRobin { - t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName) - } -} - -// Test that if resolved address list contains grpclb, the balancer option in -// service config won't take effect. But when there's no grpclb address in a new -// resolved address list, balancer will be switched to the new one. -func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - - cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{})) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}}) - var isPickFirst bool - for i := 0; i < 5000; i++ { - cc.mu.Lock() - isPickFirst = cc.curBalancerName == PickFirstBalancerName - cc.mu.Unlock() - if isPickFirst { - break - } - time.Sleep(time.Millisecond) - } - if !isPickFirst { - t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName) - } - - // ClientConn will switch balancer to grpclb when receives an address of - // type GRPCLB. - addrs := []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}} - r.UpdateState(resolver.State{Addresses: addrs}) - var isGRPCLB bool - for i := 0; i < 5000; i++ { - cc.mu.Lock() - isGRPCLB = cc.curBalancerName == "grpclb" - cc.mu.Unlock() - if isGRPCLB { - break - } - time.Sleep(time.Millisecond) - } - if !isGRPCLB { - t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName) - } - - sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`) - r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc}) - var isRoundRobin bool - for i := 0; i < 200; i++ { - cc.mu.Lock() - isRoundRobin = cc.curBalancerName == "round_robin" - cc.mu.Unlock() - if isRoundRobin { - break - } - time.Sleep(time.Millisecond) - } - // Balancer should NOT switch to round_robin because resolved list contains - // grpclb. - if isRoundRobin { - t.Fatalf("within 200 ms, cc.balancer switched to round_robin, want grpclb") - } - - // Switch balancer back. - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc}) - for i := 0; i < 5000; i++ { - cc.mu.Lock() - isRoundRobin = cc.curBalancerName == "round_robin" - cc.mu.Unlock() - if isRoundRobin { - break - } - time.Sleep(time.Millisecond) - } - if !isRoundRobin { - t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName) - } -} - -// Test that when switching to grpclb fails because grpclb is not registered, -// the fallback balancer will only get backend addresses, not the grpclb server -// address. -// -// The tests sends 3 server addresses (all backends) as resolved addresses, but -// claim the first one is grpclb server. The all RPCs should all be send to the -// other addresses, not the first one. -func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) { - internal.BalancerUnregister("grpclb") - defer balancer.Register(&magicalLB{}) - - r := manual.NewBuilderWithScheme("whatever") - - const numServers = 3 - servers, scleanup := startServers(t, numServers, math.MaxInt32) - defer scleanup() - - cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{})) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}}) - // The default balancer is pickfirst. - if err := checkPickFirst(cc, servers[1:]); err != nil { - t.Fatalf("check pickfirst returned non-nil error: %v", err) - } - // Try switching to grpclb by sending servers[0] as grpclb address. It's - // expected that servers[0] will be filtered out, so it will not be used by - // the balancer. - // - // If the filtering failed, servers[0] will be used for RPCs and the RPCs - // will succeed. The following checks will catch this and fail. - addrs := []resolver.Address{ - {Addr: servers[0].addr, Type: resolver.GRPCLB}, - {Addr: servers[1].addr}, {Addr: servers[2].addr}} - r.UpdateState(resolver.State{Addresses: addrs}) - // Still check for pickfirst, but only with server[1] and server[2]. - if err := checkPickFirst(cc, servers[1:]); err != nil { - t.Fatalf("check pickfirst returned non-nil error: %v", err) - } - // Switch to roundrobin, and check against server[1] and server[2]. - cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil) - if err := checkRoundRobin(cc, servers[1:]); err != nil { - t.Fatalf("check roundrobin returned non-nil error: %v", err) - } -} - -const inlineRemoveSubConnBalancerName = "test-inline-remove-subconn-balancer" - -func init() { - stub.Register(inlineRemoveSubConnBalancerName, stub.BalancerFuncs{ - Close: func(data *stub.BalancerData) { - data.ClientConn.RemoveSubConn(&acBalancerWrapper{}) - }, - }) -} - -// Test that when switching to balancers, the old balancer calls RemoveSubConn -// in Close. -// -// This test is to make sure this close doesn't cause a deadlock. -func (s) TestSwitchBalancerOldRemoveSubConn(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r)) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, fmt.Sprintf(`{"loadBalancingPolicy": "%v"}`, inlineRemoveSubConnBalancerName))}, nil) - // This service config update will switch balancer from - // "test-inline-remove-subconn-balancer" to "pick_first". The test balancer - // will be closed, which will call cc.RemoveSubConn() inline (this - // RemoveSubConn is not required by the API, but some balancers might do - // it). - // - // This is to make sure the cc.RemoveSubConn() from Close() doesn't cause a - // deadlock (e.g. trying to grab a mutex while it's already locked). - // - // Do it in a goroutine so this test will fail with a helpful message - // (though the goroutine will still leak). - done := make(chan struct{}) - go func() { - cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`)}, nil) - close(done) - }() - select { - case <-time.After(defaultTestTimeout): - t.Fatalf("timeout waiting for updateResolverState to finish") - case <-done: - } -} - -func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult { - scpr := r.CC.ParseServiceConfig(s) - if scpr.Err != nil { - panic(fmt.Sprintf("Error parsing config %q: %v", s, scpr.Err)) - } - return scpr -} diff --git a/call_test.go b/call_test.go deleted file mode 100644 index 3280109f4fb..00000000000 --- a/call_test.go +++ /dev/null @@ -1,211 +0,0 @@ -/* - * - * Copyright 2014 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package grpc - -import ( - "context" - "fmt" - "io" - "math" - "net" - "strconv" - "strings" - "sync" - "testing" - "time" - - "google.golang.org/grpc/codes" - "google.golang.org/grpc/internal/channelz" - "google.golang.org/grpc/internal/transport" - "google.golang.org/grpc/status" -) - -var ( - expectedRequest = "ping" - expectedResponse = "pong" - weirdError = "format verbs: %v%s" - sizeLargeErr = 1024 * 1024 - canceled = 0 -) - -const defaultTestTimeout = 10 * time.Second - -type testCodec struct { -} - -func (testCodec) Marshal(v interface{}) ([]byte, error) { - return []byte(*(v.(*string))), nil -} - -func (testCodec) Unmarshal(data []byte, v interface{}) error { - *(v.(*string)) = string(data) - return nil -} - -func (testCodec) String() string { - return "test" -} - -type testStreamHandler struct { - port string - t transport.ServerTransport -} - -func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) { - p := &parser{r: s} - for { - pf, req, err := p.recvMsg(math.MaxInt32) - if err == io.EOF { - break - } - if err != nil { - return - } - if pf != compressionNone { - t.Errorf("Received the mistaken message format %d, want %d", pf, compressionNone) - return - } - var v string - codec := testCodec{} - if err := codec.Unmarshal(req, &v); err != nil { - t.Errorf("Failed to unmarshal the received message: %v", err) - return - } - if v == "weird error" { - h.t.WriteStatus(s, status.New(codes.Internal, weirdError)) - return - } - if v == "canceled" { - canceled++ - h.t.WriteStatus(s, status.New(codes.Internal, "")) - return - } - if v == "port" { - h.t.WriteStatus(s, status.New(codes.Internal, h.port)) - return - } - - if v != expectedRequest { - h.t.WriteStatus(s, status.New(codes.Internal, strings.Repeat("A", sizeLargeErr))) - return - } - } - // send a response back to end the stream. - data, err := encode(testCodec{}, &expectedResponse) - if err != nil { - t.Errorf("Failed to encode the response: %v", err) - return - } - hdr, payload := msgHeader(data, nil) - h.t.Write(s, hdr, payload, &transport.Options{}) - h.t.WriteStatus(s, status.New(codes.OK, "")) -} - -type server struct { - lis net.Listener - port string - addr string - startedErr chan error // sent nil or an error after server starts - mu sync.Mutex - conns map[transport.ServerTransport]bool - channelzID *channelz.Identifier -} - -func newTestServer() *server { - return &server{ - startedErr: make(chan error, 1), - channelzID: channelz.NewIdentifierForTesting(channelz.RefServer, time.Now().Unix(), nil), - } -} - -// start starts server. Other goroutines should block on s.startedErr for further operations. -func (s *server) start(t *testing.T, port int, maxStreams uint32) { - var err error - if port == 0 { - s.lis, err = net.Listen("tcp", "localhost:0") - } else { - s.lis, err = net.Listen("tcp", "localhost:"+strconv.Itoa(port)) - } - if err != nil { - s.startedErr <- fmt.Errorf("failed to listen: %v", err) - return - } - s.addr = s.lis.Addr().String() - _, p, err := net.SplitHostPort(s.addr) - if err != nil { - s.startedErr <- fmt.Errorf("failed to parse listener address: %v", err) - return - } - s.port = p - s.conns = make(map[transport.ServerTransport]bool) - s.startedErr <- nil - for { - conn, err := s.lis.Accept() - if err != nil { - return - } - config := &transport.ServerConfig{ - MaxStreams: maxStreams, - ChannelzParentID: s.channelzID, - } - st, err := transport.NewServerTransport(conn, config) - if err != nil { - t.Errorf("failed to create server transport: %v", err) - continue - } - s.mu.Lock() - if s.conns == nil { - s.mu.Unlock() - st.Close() - return - } - s.conns[st] = true - s.mu.Unlock() - h := &testStreamHandler{ - port: s.port, - t: st, - } - go st.HandleStreams(func(s *transport.Stream) { - go h.handleStream(t, s) - }, func(ctx context.Context, method string) context.Context { - return ctx - }) - } -} - -func (s *server) wait(t *testing.T, timeout time.Duration) { - select { - case err := <-s.startedErr: - if err != nil { - t.Fatal(err) - } - case <-time.After(timeout): - t.Fatalf("Timed out after %v waiting for server to be ready", timeout) - } -} - -func (s *server) stop() { - s.lis.Close() - s.mu.Lock() - for c := range s.conns { - c.Close() - } - s.conns = nil - s.mu.Unlock() -} diff --git a/clientconn.go b/clientconn.go index 565aa247473..86275dca4de 100644 --- a/clientconn.go +++ b/clientconn.go @@ -672,14 +672,14 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { cc.mu.Unlock() if cbn != grpclbName { // Filter any grpclb addresses since we don't have the grpclb balancer. - for i := 0; i < len(s.Addresses); { - if s.Addresses[i].Type == resolver.GRPCLB { - copy(s.Addresses[i:], s.Addresses[i+1:]) - s.Addresses = s.Addresses[:len(s.Addresses)-1] + var addrs []resolver.Address + for _, addr := range s.Addresses { + if addr.Type == resolver.GRPCLB { continue } - i++ + addrs = append(addrs, addr) } + s.Addresses = addrs } uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) if ret == nil { diff --git a/clientconn_state_transition_test.go b/clientconn_state_transition_test.go index 0944e8434d7..d1c1321b33b 100644 --- a/clientconn_state_transition_test.go +++ b/clientconn_state_transition_test.go @@ -35,7 +35,10 @@ import ( "google.golang.org/grpc/resolver/manual" ) -const stateRecordingBalancerName = "state_recoding_balancer" +const ( + stateRecordingBalancerName = "state_recoding_balancer" + defaultTestTimeout = 10 * time.Second +) var testBalancerBuilder = newStateRecordingBalancerBuilder() diff --git a/clientconn_test.go b/clientconn_test.go index 80547f51037..353d8fb325d 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -40,9 +40,18 @@ import ( "google.golang.org/grpc/keepalive" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/testdata" ) +func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult { + scpr := r.CC.ParseServiceConfig(s) + if scpr.Err != nil { + panic(fmt.Sprintf("Error parsing config %q: %v", s, scpr.Err)) + } + return scpr +} + func (s) TestDialWithTimeout(t *testing.T) { lis, err := net.Listen("tcp", "localhost:0") if err != nil { diff --git a/internal/testutils/fakegrpclb/server.go b/internal/testutils/fakegrpclb/server.go new file mode 100644 index 00000000000..8e33340484e --- /dev/null +++ b/internal/testutils/fakegrpclb/server.go @@ -0,0 +1,249 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package fakegrpclb provides a fake implementation of the grpclb server. +package fakegrpclb + +import ( + "errors" + "fmt" + "io" + "net" + "strconv" + "sync" + "time" + + "google.golang.org/grpc" + lbgrpc "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" + lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/pretty" + "google.golang.org/grpc/status" +) + +var logger = grpclog.Component("fake_grpclb") + +// ServerParams wraps options passed while creating a Server. +type ServerParams struct { + ListenPort int // Listening port for the balancer server. + ServerOptions []grpc.ServerOption // gRPC options for the balancer server. + + LoadBalancedServiceName string // Service name being load balanced for. + LoadBalancedServicePort int // Service port being load balanced for. + BackendAddresses []string // Service backends to balance load across. + ShortStream bool // End balancer stream after sending server list. +} + +// Server is a fake implementation of the grpclb LoadBalancer service. It does +// not support stats reporting from clients, and always sends back a static list +// of backends to the client to balance load across. +// +// It is safe for concurrent access. +type Server struct { + lbgrpc.UnimplementedLoadBalancerServer + + // Options copied over from ServerParams passed to NewServer. + sOpts []grpc.ServerOption // gRPC server options. + serviceName string // Service name being load balanced for. + servicePort int // Service port being load balanced for. + shortStream bool // End balancer stream after sending server list. + + // Values initialized using ServerParams passed to NewServer. + backends []*lbpb.Server // Service backends to balance load across. + lis net.Listener // Listener for grpc connections to the LoadBalancer service. + + // mu guards access to below fields. + mu sync.Mutex + grpcServer *grpc.Server // Underlying grpc server. + address string // Actual listening address. + + stopped chan struct{} // Closed when Stop() is called. +} + +// NewServer creates a new Server with passed in params. Returns a non-nil error +// if the params are invalid. +func NewServer(params ServerParams) (*Server, error) { + var servers []*lbpb.Server + for _, addr := range params.BackendAddresses { + ipStr, portStr, err := net.SplitHostPort(addr) + if err != nil { + return nil, fmt.Errorf("failed to parse list of backend address %q: %v", addr, err) + } + ip := net.ParseIP(ipStr) + if ip == nil { + return nil, fmt.Errorf("failed to parse ip: %q", ipStr) + } + port, err := strconv.Atoi(portStr) + if err != nil { + return nil, fmt.Errorf("failed to convert port %q to int", portStr) + } + logger.Infof("Adding backend ip: %q, port: %d to server list", ip.String(), port) + servers = append(servers, &lbpb.Server{ + IpAddress: ip, + Port: int32(port), + }) + } + + lis, err := net.Listen("tcp", ":"+strconv.Itoa(params.ListenPort)) + if err != nil { + return nil, fmt.Errorf("failed to listen on port %q: %v", params.ListenPort, err) + } + + return &Server{ + sOpts: params.ServerOptions, + serviceName: params.LoadBalancedServiceName, + servicePort: params.LoadBalancedServicePort, + shortStream: params.ShortStream, + backends: servers, + lis: lis, + address: lis.Addr().String(), + stopped: make(chan struct{}), + }, nil +} + +// Serve starts serving the LoadBalancer service on a gRPC server. +// +// It returns early with a non-nil error if it is unable to start serving. +// Otherwise, it blocks until Stop() is called, at which point it returns the +// error returned by the underlying grpc.Server's Serve() method. +func (s *Server) Serve() error { + s.mu.Lock() + if s.grpcServer != nil { + s.mu.Unlock() + return errors.New("Serve() called multiple times") + } + + server := grpc.NewServer(s.sOpts...) + s.grpcServer = server + s.mu.Unlock() + + logger.Infof("Begin listening on %s", s.lis.Addr().String()) + lbgrpc.RegisterLoadBalancerServer(server, s) + return server.Serve(s.lis) // This call will block. +} + +// Stop stops serving the LoadBalancer service and unblocks the preceding call +// to Serve(). +func (s *Server) Stop() { + defer close(s.stopped) + s.mu.Lock() + if s.grpcServer != nil { + s.grpcServer.Stop() + s.grpcServer = nil + } + s.mu.Unlock() +} + +// Address returns the host:port on which the LoadBalancer service is serving. +func (s *Server) Address() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.address +} + +// BalanceLoad provides a fake implementation of the LoadBalancer service. +func (s *Server) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServer) error { + logger.Info("New BalancerLoad stream started") + + req, err := stream.Recv() + if err == io.EOF { + logger.Warning("Received EOF when reading from the stream") + return nil + } + if err != nil { + logger.Warning("Failed to read LoadBalanceRequest from stream: %v", err) + return err + } + logger.Infof("Received LoadBalancerRequest:\n%s", pretty.ToJSON(req)) + + // Initial request contains the service being load balanced for. + initialReq := req.GetInitialRequest() + if initialReq == nil { + logger.Info("First message on the stream does not contain an InitialLoadBalanceRequest") + return status.Error(codes.Unknown, "First request not an InitialLoadBalanceRequest") + } + + // Basic validation of the service name and port from the incoming request. + // + // Clients targeting service:port can sometimes include the ":port" suffix in + // their requested names; handle this case. + serviceName, port, err := net.SplitHostPort(initialReq.Name) + if err != nil { + // Requested name did not contain a port. So, use the name as is. + serviceName = initialReq.Name + } else { + p, err := strconv.Atoi(port) + if err != nil { + logger.Info("Failed to parse requested service port %q to integer", port) + return status.Error(codes.Unknown, "Bad requested service port number") + } + if p != s.servicePort { + logger.Info("Requested service port number %q does not match expected", port, s.servicePort) + return status.Error(codes.Unknown, "Bad requested service port number") + } + } + if serviceName != s.serviceName { + logger.Info("Requested service name %q does not match expected %q", serviceName, s.serviceName) + return status.Error(codes.NotFound, "Bad requested service name") + } + + // Empty initial response disables stats reporting from the client. Stats + // reporting from the client is used to determine backend load and is not + // required for the purposes of this fake. + initResp := &lbpb.LoadBalanceResponse{ + LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{ + InitialResponse: &lbpb.InitialLoadBalanceResponse{}, + }, + } + if err := stream.Send(initResp); err != nil { + logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err) + return err + } + + resp := &lbpb.LoadBalanceResponse{ + LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{ + ServerList: &lbpb.ServerList{Servers: s.backends}, + }, + } + logger.Infof("Sending response with server list: %s", pretty.ToJSON(resp)) + if err := stream.Send(resp); err != nil { + logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err) + return err + } + + if s.shortStream { + logger.Info("Ending stream early as the short stream option was set") + return nil + } + + for { + select { + case <-stream.Context().Done(): + return nil + case <-s.stopped: + return nil + case <-time.After(10 * time.Second): + logger.Infof("Sending response with server list: %s", pretty.ToJSON(resp)) + if err := stream.Send(resp); err != nil { + logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err) + return err + } + } + } +} diff --git a/interop/fake_grpclb/fake_grpclb.go b/interop/fake_grpclb/fake_grpclb.go index 6804235486b..e29d2f439fa 100644 --- a/interop/fake_grpclb/fake_grpclb.go +++ b/interop/fake_grpclb/fake_grpclb.go @@ -23,18 +23,13 @@ package main import ( "flag" - "net" - "strconv" "strings" - "time" "google.golang.org/grpc" - lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/alts" "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/status" + "google.golang.org/grpc/internal/testutils/fakegrpclb" "google.golang.org/grpc/testdata" ) @@ -49,69 +44,9 @@ var ( logger = grpclog.Component("interop") ) -type loadBalancerServer struct { - lbpb.UnimplementedLoadBalancerServer - serverListResponse *lbpb.LoadBalanceResponse -} - -func (l *loadBalancerServer) BalanceLoad(stream lbpb.LoadBalancer_BalanceLoadServer) error { - logger.Info("Begin handling new BalancerLoad request.") - var lbReq *lbpb.LoadBalanceRequest - var err error - if lbReq, err = stream.Recv(); err != nil { - logger.Errorf("Error receiving LoadBalanceRequest: %v", err) - return err - } - logger.Info("LoadBalancerRequest received.") - initialReq := lbReq.GetInitialRequest() - if initialReq == nil { - logger.Info("Expected first request to be an InitialRequest. Got: %v", lbReq) - return status.Error(codes.Unknown, "First request not an InitialRequest") - } - // gRPC clients targeting foo.bar.com:443 can sometimes include the ":443" suffix in - // their requested names; handle this case. TODO: make 443 configurable? - var cleanedName string - var requestedNamePortNumber string - if cleanedName, requestedNamePortNumber, err = net.SplitHostPort(initialReq.Name); err != nil { - cleanedName = initialReq.Name - } else { - if requestedNamePortNumber != "443" { - logger.Info("Bad requested service name port number: %v.", requestedNamePortNumber) - return status.Error(codes.Unknown, "Bad requested service name port number") - } - } - if cleanedName != *serviceName { - logger.Info("Expected requested service name: %v. Got: %v", *serviceName, initialReq.Name) - return status.Error(codes.NotFound, "Bad requested service name") - } - if err := stream.Send(&lbpb.LoadBalanceResponse{ - LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{ - InitialResponse: &lbpb.InitialLoadBalanceResponse{}, - }, - }); err != nil { - logger.Errorf("Error sending initial LB response: %v", err) - return status.Error(codes.Unknown, "Error sending initial response") - } - logger.Info("Send LoadBalanceResponse: %v", l.serverListResponse) - if err := stream.Send(l.serverListResponse); err != nil { - logger.Errorf("Error sending LB response: %v", err) - return status.Error(codes.Unknown, "Error sending response") - } - if *shortStream { - return nil - } - for { - logger.Info("Send LoadBalanceResponse: %v", l.serverListResponse) - if err := stream.Send(l.serverListResponse); err != nil { - logger.Errorf("Error sending LB response: %v", err) - return status.Error(codes.Unknown, "Error sending response") - } - time.Sleep(10 * time.Second) - } -} - func main() { flag.Parse() + var opts []grpc.ServerOption if *useTLS { certFile := testdata.Path("server1.pem") @@ -126,47 +61,23 @@ func main() { altsTC := alts.NewServerCreds(altsOpts) opts = append(opts, grpc.Creds(altsTC)) } - var serverList []*lbpb.Server - if len(*backendAddrs) == 0 { - serverList = make([]*lbpb.Server, 0) - } else { - rawBackendAddrs := strings.Split(*backendAddrs, ",") - serverList = make([]*lbpb.Server, len(rawBackendAddrs)) - for i := range rawBackendAddrs { - rawIP, rawPort, err := net.SplitHostPort(rawBackendAddrs[i]) - if err != nil { - logger.Fatalf("Failed to parse --backend_addrs[%d]=%v, error: %v", i, rawBackendAddrs[i], err) - } - ip := net.ParseIP(rawIP) - if ip == nil { - logger.Fatalf("Failed to parse ip: %v", rawIP) - } - numericPort, err := strconv.Atoi(rawPort) - if err != nil { - logger.Fatalf("Failed to convert port %v to int", rawPort) - } - logger.Infof("Adding backend ip: %v, port: %d", ip.String(), numericPort) - serverList[i] = &lbpb.Server{ - IpAddress: ip, - Port: int32(numericPort), - } - } - } - serverListResponse := &lbpb.LoadBalanceResponse{ - LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{ - ServerList: &lbpb.ServerList{ - Servers: serverList, - }, - }, - } - server := grpc.NewServer(opts...) - logger.Infof("Begin listening on %d.", *port) - lis, err := net.Listen("tcp", ":"+strconv.Itoa(*port)) + + rawBackendAddrs := strings.Split(*backendAddrs, ",") + server, err := fakegrpclb.NewServer(fakegrpclb.ServerParams{ + ListenPort: *port, + ServerOptions: opts, + LoadBalancedServiceName: *serviceName, + LoadBalancedServicePort: 443, // TODO: make this configurable? + BackendAddresses: rawBackendAddrs, + ShortStream: *shortStream, + }) if err != nil { - logger.Fatalf("Failed to listen on port %v: %v", *port, err) + logger.Fatalf("Failed to create balancer server: %v", err) + } + + // Serve() starts serving and blocks until Stop() is called. We don't need to + // call Stop() here since we want the server to run until we are killed. + if err := server.Serve(); err != nil { + logger.Fatalf("Failed to start balancer server: %v", err) } - lbpb.RegisterLoadBalancerServer(server, &loadBalancerServer{ - serverListResponse: serverListResponse, - }) - server.Serve(lis) } diff --git a/reflection/serverreflection_test.go b/reflection/serverreflection_test.go index b35f674d926..3aeac2be072 100644 --- a/reflection/serverreflection_test.go +++ b/reflection/serverreflection_test.go @@ -87,7 +87,7 @@ func loadFileDesc(filename string) (*descriptorpb.FileDescriptorProto, []byte) { func loadFileDescDynamic(b []byte) (*descriptorpb.FileDescriptorProto, protoreflect.FileDescriptor, []byte) { m := new(descriptorpb.FileDescriptorProto) if err := proto.Unmarshal(b, m); err != nil { - panic(fmt.Sprintf("failed to unmarshal dynamic proto raw descriptor")) + panic("failed to unmarshal dynamic proto raw descriptor") } fd, err := protodesc.NewFile(m, nil) diff --git a/server_test.go b/server_test.go index b1593916014..7d4cf7bfc21 100644 --- a/server_test.go +++ b/server_test.go @@ -28,12 +28,20 @@ import ( "time" "google.golang.org/grpc/internal/transport" + "google.golang.org/grpc/status" ) type emptyServiceServer interface{} type testServer struct{} +func errorDesc(err error) string { + if s, ok := status.FromError(err); ok { + return s.Message() + } + return err.Error() +} + func (s) TestStopBeforeServe(t *testing.T) { lis, err := net.Listen("tcp", "localhost:0") if err != nil { diff --git a/test/balancer_switching_test.go b/test/balancer_switching_test.go new file mode 100644 index 00000000000..2453738bfa4 --- /dev/null +++ b/test/balancer_switching_test.go @@ -0,0 +1,636 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package test + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils/fakegrpclb" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +const ( + loadBalancedServiceName = "foo.bar.service" + loadBalancedServicePort = 443 + wantGRPCLBTraceDesc = `Channel switches to new LB policy "grpclb"` + wantRoundRobinTraceDesc = `Channel switches to new LB policy "round_robin"` + wantPickFirstTraceDesc = `Channel switches to new LB policy "pick_first"` +) + +// setupBackendsAndFakeGRPCLB sets up the stub server backends and a fake grpclb +// server for tests which exercise balancer switch scenarios involving grpclb. +// Returns a cleanup function to be invoked by the caller. +func setupBackendsAndFakeGRPCLB(t *testing.T) ([]*stubserver.StubServer, *fakegrpclb.Server, func()) { + czCleanup := channelz.NewChannelzStorageForTesting() + backends, backendsCleanup := startBackendsForBalancerSwitch(t) + rawAddrs := stubBackendsToRawAddrs(backends) + + lbServer, err := fakegrpclb.NewServer(fakegrpclb.ServerParams{ + LoadBalancedServiceName: loadBalancedServiceName, + LoadBalancedServicePort: loadBalancedServicePort, + BackendAddresses: rawAddrs, + }) + if err != nil { + t.Fatalf("failed to create fake grpclb server: %v", err) + } + go func() { + if err := lbServer.Serve(); err != nil { + t.Errorf("fake grpclb Serve() failed: %v", err) + } + }() + + return backends, lbServer, func() { + backendsCleanup() + lbServer.Stop() + czCleanupWrapper(czCleanup, t) + } +} + +// startBackendsForBalancerSwitch spins up a bunch of stub server backends +// exposing the TestService. Returns a cleanup function to be invoked by the +// caller. +func startBackendsForBalancerSwitch(t *testing.T) ([]*stubserver.StubServer, func()) { + t.Helper() + + const backendCount = 3 + backends := make([]*stubserver.StubServer, backendCount) + for i := 0; i < backendCount; i++ { + backend := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + t.Logf("Started TestService backend at: %q", backend.Address) + backends[i] = backend + } + return backends, func() { + for _, b := range backends { + b.Stop() + } + } +} + +// stubBackendsToRawAddrs converts from a set of stub server backends to raw +// address strings. Useful when pushing addresses to the fake grpclb server. +func stubBackendsToRawAddrs(backends []*stubserver.StubServer) []string { + addrs := make([]string, len(backends)) + for i, backend := range backends { + addrs[i] = backend.Address + } + return addrs +} + +// checkForTraceEvent looks for a trace event in the top level channel matching +// the given description. Events before since are ignored. Returns nil error if +// such an event is found. +func checkForTraceEvent(ctx context.Context, wantDesc string, since time.Time) error { + for { + if err := ctx.Err(); err != nil { + return err + } + tcs, _ := channelz.GetTopChannels(0, 0) + if len(tcs) != 1 { + return fmt.Errorf("channelz returned %d top channels, want 1", len(tcs)) + } + for _, event := range tcs[0].Trace.Events { + if event.Timestamp.Before(since) { + continue + } + if strings.Contains(event.Desc, wantDesc) { + return nil + } + } + time.Sleep(defaultTestShortTimeout) + } +} + +// TestBalancerSwitch_Basic tests the basic scenario of switching from one LB +// policy to another, as specified in the service config. +func (s) TestBalancerSwitch_Basic(t *testing.T) { + backends, cleanup := startBackendsForBalancerSwitch(t) + defer cleanup() + addrs := stubBackendsToResolverAddrs(backends) + + r := manual.NewBuilderWithScheme("whatever") + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + + // Push a resolver update without an LB policy in the service config. The + // channel should pick the default LB policy, which is pick_first. + r.UpdateState(resolver.State{Addresses: addrs}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil { + t.Fatal(err) + } + + // Push a resolver update with the service config specifying "round_robin". + r.UpdateState(resolver.State{ + Addresses: addrs, + ServiceConfig: parseServiceConfig(t, r, rrServiceConfig), + }) + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } + + // Push a resolver update with the service config specifying "pick_first". + r.UpdateState(resolver.State{ + Addresses: addrs, + ServiceConfig: parseServiceConfig(t, r, pickFirstServiceConfig), + }) + if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil { + t.Fatal(err) + } +} + +// TestBalancerSwitch_grpclbToPickFirst tests the scenario where the channel +// starts off "grpclb", switches to "pick_first" and back. +func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) { + backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t) + defer cleanup() + + addrs := stubBackendsToResolverAddrs(backends) + r := manual.NewBuilderWithScheme("whatever") + target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName) + cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + + // Push a resolver update with no service config and a single address pointing + // to the grpclb server we created above. This will cause the channel to + // switch to the "grpclb" balancer, and will equally distribute RPCs across + // the backends as the fake grpclb server does not support load reporting from + // the clients. + now := time.Now() + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } + if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err) + } + + // Push a resolver update containing a non-existent grpclb server address. + // This should not lead to a balancer switch. + now = time.Now() + const nonExistentServer = "non-existent-grpclb-server-address" + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: nonExistentServer, Type: resolver.GRPCLB}}}) + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + wantDesc := fmt.Sprintf("Channel switches to new LB policy %q", nonExistentServer) + if err := checkForTraceEvent(sCtx, wantDesc, now); err == nil { + t.Fatal("channel switched balancers when expected not to") + } + + // Push a resolver update containing no grpclb server address. This should + // lead to the channel using the default LB policy which is pick_first. + now = time.Now() + r.UpdateState(resolver.State{Addresses: addrs}) + if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil { + t.Fatal(err) + } + if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err) + } +} + +// TestBalancerSwitch_pickFirstToGRPCLB tests the scenario where the channel +// starts off with "pick_first", switches to "grpclb" and back. +func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) { + backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t) + defer cleanup() + + addrs := stubBackendsToResolverAddrs(backends) + r := manual.NewBuilderWithScheme("whatever") + target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName) + cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + + // Push a resolver update containing no grpclb server address. This should + // lead to the channel using the default LB policy which is pick_first. + now := time.Now() + r.UpdateState(resolver.State{Addresses: addrs}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err) + } + if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil { + t.Fatal(err) + } + + // Push a resolver update with no service config and a single address pointing + // to the grpclb server we created above. This will cause the channel to + // switch to the "grpclb" balancer, and will equally distribute RPCs across + // the backends as the fake grpclb server does not support load reporting from + // the clients. + now = time.Now() + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}}) + if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err) + } + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } + + // Push a resolver update containing a non-existent grpclb server address. + // This should not lead to a balancer switch. + now = time.Now() + const nonExistentServer = "non-existent-grpclb-server-address" + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: nonExistentServer, Type: resolver.GRPCLB}}}) + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + wantDesc := fmt.Sprintf("Channel switches to new LB policy %q", nonExistentServer) + if err := checkForTraceEvent(sCtx, wantDesc, now); err == nil { + t.Fatal("channel switched balancers when expected not to") + } + + // Switch to "pick_first" again by sending no grpclb server addresses. + now = time.Now() + r.UpdateState(resolver.State{Addresses: addrs}) + if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err) + } + if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil { + t.Fatal(err) + } +} + +// TestBalancerSwitch_RoundRobinToGRPCLB tests the scenario where the channel +// starts off with "round_robin", switches to "grpclb" and back. +// +// Note that this test uses the deprecated `loadBalancingPolicy` field in the +// service config. +func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) { + backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t) + defer cleanup() + + addrs := stubBackendsToResolverAddrs(backends) + r := manual.NewBuilderWithScheme("whatever") + target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName) + cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + + // Note the use of the deprecated `loadBalancingPolicy` field here instead + // of the now recommended `loadBalancingConfig` field. The logic in the + // ClientConn which decides which balancer to switch to looks at the + // following places in the given order of preference: + // - `loadBalancingConfig` field + // - addresses of type grpclb + // - `loadBalancingPolicy` field + // If we use the `loadBalancingPolicy` field, the switch to "grpclb" later on + // in the test will not happen as the ClientConn will continue to use the LB + // policy received in the first update. + scpr := parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`) + + // Push a resolver update with the service config specifying "round_robin". + now := time.Now() + r.UpdateState(resolver.State{ + Addresses: addrs, + ServiceConfig: scpr, + }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err) + } + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } + + // Push a resolver update with no service config and a single address pointing + // to the grpclb server we created above. This will cause the channel to + // switch to the "grpclb" balancer, and will equally distribute RPCs across + // the backends as the fake grpclb server does not support load reporting from + // the clients. + now = time.Now() + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}, + ServiceConfig: scpr, + }) + if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err) + } + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } + + // Switch back to "round_robin". + now = time.Now() + r.UpdateState(resolver.State{ + Addresses: addrs, + ServiceConfig: scpr, + }) + if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err) + } + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } +} + +// TestBalancerSwitch_grpclbNotRegistered tests the scenario where the grpclb +// balancer is not registered. Verifies that the ClientConn fallbacks to the +// default LB policy or the LB policy specified in the service config, and that +// addresses of type "grpclb" are filtered out. +func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) { + // Unregister the grpclb balancer builder for the duration of this test. + grpclbBuilder := balancer.Get("grpclb") + internal.BalancerUnregister(grpclbBuilder.Name()) + defer balancer.Register(grpclbBuilder) + + backends, cleanup := startBackendsForBalancerSwitch(t) + defer cleanup() + addrs := stubBackendsToResolverAddrs(backends) + + r := manual.NewBuilderWithScheme("whatever") + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + + // Push a resolver update which contains a bunch of stub server backends and a + // grpclb server address. The latter should get the ClientConn to try and + // apply the grpclb policy. But since grpclb is not registered, it should + // fallback to the default LB policy which is pick_first. The ClientConn is + // also expected to filter out the grpclb address when sending the addresses + // list fo pick_first. + grpclbAddr := []resolver.Address{{Addr: "non-existent-grpclb-server-address", Type: resolver.GRPCLB}} + addrs = append(grpclbAddr, addrs...) + now := time.Now() + r.UpdateState(resolver.State{Addresses: addrs}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := checkPickFirst(ctx, cc, addrs[1].Addr); err != nil { + t.Fatal(err) + } + if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err) + } + + // Push a resolver update with the same addresses, but with a service config + // specifying "round_robin". The ClientConn is expected to filter out the + // grpclb address when sending the addresses list to round_robin. + r.UpdateState(resolver.State{ + Addresses: addrs, + ServiceConfig: parseServiceConfig(t, r, rrServiceConfig), + }) + if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { + t.Fatal(err) + } +} + +// TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy verifies that +// if the resolver update contains any addresses of type "grpclb", it overrides +// the LB policy specifies in the deprecated `loadBalancingPolicy` field of the +// service config. +func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing.T) { + backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t) + defer cleanup() + + addrs := stubBackendsToResolverAddrs(backends) + r := manual.NewBuilderWithScheme("whatever") + target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName) + cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + + // Push a resolver update containing no grpclb server address. This should + // lead to the channel using the default LB policy which is pick_first. + now := time.Now() + r.UpdateState(resolver.State{Addresses: addrs}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err) + } + if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil { + t.Fatal(err) + } + + // Push a resolver update with no service config. The addresses list contains + // the stub backend addresses and a single address pointing to the grpclb + // server we created above. This will cause the channel to switch to the + // "grpclb" balancer, and will equally distribute RPCs across the backends. + now = time.Now() + r.UpdateState(resolver.State{ + Addresses: append(addrs, resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}), + }) + if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err) + } + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } + + // Push a resolver update with a service config using the deprecated + // `loadBalancingPolicy` field pointing to round_robin. The addresses list + // contains an address of type "grpclb". This should be preferred and hence + // there should be no balancer switch. + scpr := parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`) + now = time.Now() + r.UpdateState(resolver.State{ + Addresses: append(addrs, resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}), + ServiceConfig: scpr, + }) + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if err := checkForTraceEvent(sCtx, wantRoundRobinTraceDesc, now); err == nil { + t.Fatal("channel switched balancers when expected not to") + } + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } + + // Switch to "round_robin" by removing the address of type "grpclb". + now = time.Now() + r.UpdateState(resolver.State{Addresses: addrs}) + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } + if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err) + } +} + +// TestBalancerSwitch_LoadBalancingConfigTrumps verifies that the +// `loadBalancingConfig` field in the service config trumps over addresses of +// type "grpclb" when it comes to deciding which LB policy is applied on the +// channel. +func (s) TestBalancerSwitch_LoadBalancingConfigTrumps(t *testing.T) { + backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t) + defer cleanup() + + addrs := stubBackendsToResolverAddrs(backends) + r := manual.NewBuilderWithScheme("whatever") + target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName) + cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + + // Push a resolver update with no service config and a single address pointing + // to the grpclb server we created above. This will cause the channel to + // switch to the "grpclb" balancer, and will equally distribute RPCs across + // the backends as the fake grpclb server does not support load reporting from + // the clients. + now := time.Now() + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } + if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err) + } + + // Push a resolver update with the service config specifying "round_robin" + // through the recommended `loadBalancingConfig` field. + now = time.Now() + r.UpdateState(resolver.State{ + Addresses: addrs, + ServiceConfig: parseServiceConfig(t, r, rrServiceConfig), + }) + if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err) + } + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } + + // Push a resolver update with no service config and an address of type + // "grpclb". The ClientConn should continue to use the service config received + // earlier, which specified the use of "round_robin" through the + // `loadBalancingConfig` field, and therefore the balancer should not be + // switched. And because the `loadBalancingConfig` field trumps everything + // else, the address of type "grpclb" should be ignored. + grpclbAddr := resolver.Address{Addr: "non-existent-grpclb-server-address", Type: resolver.GRPCLB} + now = time.Now() + r.UpdateState(resolver.State{Addresses: append(addrs, grpclbAddr)}) + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if err := checkForTraceEvent(sCtx, wantRoundRobinTraceDesc, now); err == nil { + t.Fatal("channel switched balancers when expected not to") + } + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } +} + +// TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose tests the scenario +// where the balancer being switched out calls RemoveSubConn() in its Close() +// method. Verifies that this sequence of calls doesn't lead to a deadlock. +func (s) TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose(t *testing.T) { + // Register a stub balancer which calls RemoveSubConn() from its Close(). + scChan := make(chan balancer.SubConn, 1) + uccsCalled := make(chan struct{}, 1) + stub.Register(t.Name(), stub.BalancerFuncs{ + UpdateClientConnState: func(data *stub.BalancerData, ccs balancer.ClientConnState) error { + sc, err := data.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{}) + if err != nil { + t.Errorf("failed to create subConn: %v", err) + } + scChan <- sc + close(uccsCalled) + return nil + }, + Close: func(data *stub.BalancerData) { + data.ClientConn.RemoveSubConn(<-scChan) + }, + }) + + r := manual.NewBuilderWithScheme("whatever") + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + + // Push a resolver update specifying our stub balancer as the LB policy. + scpr := parseServiceConfig(t, r, fmt.Sprintf(`{"loadBalancingPolicy": "%v"}`, t.Name())) + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: "dummy-address"}}, + ServiceConfig: scpr, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + select { + case <-ctx.Done(): + t.Fatalf("timeout waiting for UpdateClientConnState to be called: %v", ctx.Err()) + case <-uccsCalled: + } + + // The following service config update will switch balancer from our stub + // balancer to pick_first. The former will be closed, which will call + // cc.RemoveSubConn() inline (this RemoveSubConn is not required by the API, + // but some balancers might do it). + // + // This is to make sure the cc.RemoveSubConn() from Close() doesn't cause a + // deadlock (e.g. trying to grab a mutex while it's already locked). + // + // Do it in a goroutine so this test will fail with a helpful message + // (though the goroutine will still leak). + done := make(chan struct{}) + go func() { + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: "dummy-address"}}, + ServiceConfig: parseServiceConfig(t, r, pickFirstServiceConfig), + }) + close(done) + }() + + select { + case <-ctx.Done(): + t.Fatalf("timeout waiting for resolver.UpdateState to finish: %v", ctx.Err()) + case <-done: + } +} diff --git a/test/pickfirst_test.go b/test/pickfirst_test.go index c88fdac1e72..00a40055ea7 100644 --- a/test/pickfirst_test.go +++ b/test/pickfirst_test.go @@ -128,9 +128,9 @@ func checkPickFirst(ctx context.Context, cc *grpc.ClientConn, wantAddr string) e return nil } -// backendsToAddrs is a helper routine to convert from a set of backends to +// stubBackendsToResolverAddrs converts from a set of stub server backends to // resolver addresses. Useful when pushing addresses to the manual resolver. -func backendsToAddrs(backends []*stubserver.StubServer) []resolver.Address { +func stubBackendsToResolverAddrs(backends []*stubserver.StubServer) []resolver.Address { addrs := make([]resolver.Address, len(backends)) for i, backend := range backends { addrs[i] = resolver.Address{Addr: backend.Address} @@ -143,7 +143,7 @@ func backendsToAddrs(backends []*stubserver.StubServer) []resolver.Address { func (s) TestPickFirst_OneBackend(t *testing.T) { cc, r, backends := setupPickFirst(t, 1) - addrs := backendsToAddrs(backends) + addrs := stubBackendsToResolverAddrs(backends) r.UpdateState(resolver.State{Addresses: addrs}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -158,7 +158,7 @@ func (s) TestPickFirst_OneBackend(t *testing.T) { func (s) TestPickFirst_MultipleBackends(t *testing.T) { cc, r, backends := setupPickFirst(t, 2) - addrs := backendsToAddrs(backends) + addrs := stubBackendsToResolverAddrs(backends) r.UpdateState(resolver.State{Addresses: addrs}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -174,7 +174,7 @@ func (s) TestPickFirst_MultipleBackends(t *testing.T) { func (s) TestPickFirst_OneServerDown(t *testing.T) { cc, r, backends := setupPickFirst(t, 2) - addrs := backendsToAddrs(backends) + addrs := stubBackendsToResolverAddrs(backends) r.UpdateState(resolver.State{Addresses: addrs}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -197,7 +197,7 @@ func (s) TestPickFirst_OneServerDown(t *testing.T) { func (s) TestPickFirst_AllServersDown(t *testing.T) { cc, r, backends := setupPickFirst(t, 2) - addrs := backendsToAddrs(backends) + addrs := stubBackendsToResolverAddrs(backends) r.UpdateState(resolver.State{Addresses: addrs}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -228,7 +228,7 @@ func (s) TestPickFirst_AllServersDown(t *testing.T) { func (s) TestPickFirst_AddressesRemoved(t *testing.T) { cc, r, backends := setupPickFirst(t, 3) - addrs := backendsToAddrs(backends) + addrs := stubBackendsToResolverAddrs(backends) r.UpdateState(resolver.State{Addresses: addrs}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -273,7 +273,7 @@ func (s) TestPickFirst_AddressesRemoved(t *testing.T) { // backends are added, the RPC is able to complete. func (s) TestPickFirst_NewAddressWhileBlocking(t *testing.T) { cc, r, backends := setupPickFirst(t, 2) - addrs := backendsToAddrs(backends) + addrs := stubBackendsToResolverAddrs(backends) r.UpdateState(resolver.State{Addresses: addrs}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) diff --git a/test/resolver_update_test.go b/test/resolver_update_test.go index b2443fdd811..bf7f0d2c2ed 100644 --- a/test/resolver_update_test.go +++ b/test/resolver_update_test.go @@ -129,7 +129,7 @@ func (s) TestResolverUpdate_InvalidServiceConfigAsFirstUpdate(t *testing.T) { // The wrappingBalancer wraps a pick_first balancer and writes to a channel when // it receives a ClientConn update. This is different to a stub balancer which -// only notifies of updates from grpc, but does not contain a real balanacer. +// only notifies of updates from grpc, but does not contain a real balancer. // // The wrappingBalancer allows us to write tests with a real backend and make // real RPCs. diff --git a/test/roundrobin_test.go b/test/roundrobin_test.go index a7b94197ffd..7f16aa2cb3c 100644 --- a/test/roundrobin_test.go +++ b/test/roundrobin_test.go @@ -43,7 +43,8 @@ import ( const rrServiceConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}` -func checkRoundRobin(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error { +func checkRoundRobin(ctx context.Context, cc *grpc.ClientConn, addrs []resolver.Address) error { + client := testgrpc.NewTestServiceClient(cc) var peer peer.Peer // Make sure connections to all backends are up. backendCount := len(addrs) @@ -126,7 +127,7 @@ func testRoundRobinBasic(ctx context.Context, t *testing.T, opts ...grpc.DialOpt } r.UpdateState(resolver.State{Addresses: addrs}) - if err := checkRoundRobin(ctx, client, addrs); err != nil { + if err := checkRoundRobin(ctx, cc, addrs); err != nil { t.Fatal(err) } return cc, r, backends @@ -244,8 +245,7 @@ func (s) TestRoundRobin_OneServerDown(t *testing.T) { for i := 0; i < len(backends)-1; i++ { addrs[i] = resolver.Address{Addr: backends[i].Address} } - client := testpb.NewTestServiceClient(cc) - if err := checkRoundRobin(ctx, client, addrs); err != nil { + if err := checkRoundRobin(ctx, cc, addrs); err != nil { t.Fatalf("RPCs are not being round robined across remaining servers: %v", err) } }