From eead9a824c37eadae1d91de892ede9797084d78e Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 10 Mar 2022 13:15:11 -0800 Subject: [PATCH] grpc: delete deprecated API WithBalancerName() (#5232) --- balancer_switching_test.go | 43 +--- clientconn.go | 51 ++-- clientconn_state_transition_test.go | 99 ++++--- clientconn_test.go | 62 ++--- dialoptions.go | 43 +--- test/balancer_test.go | 14 +- test/end2end_test.go | 2 +- test/healthcheck_test.go | 386 ++++++++++------------------ 8 files changed, 254 insertions(+), 446 deletions(-) diff --git a/balancer_switching_test.go b/balancer_switching_test.go index 5d9a1f9fffc..3c12dd2e4ee 100644 --- a/balancer_switching_test.go +++ b/balancer_switching_test.go @@ -26,7 +26,7 @@ import ( "time" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/resolver" @@ -161,7 +161,7 @@ func (s) TestSwitchBalancer(t *testing.T) { servers, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -184,38 +184,11 @@ func (s) TestSwitchBalancer(t *testing.T) { } } -// Test that balancer specified by dial option will not be overridden. -func (s) TestBalancerDialOption(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - - const numServers = 2 - servers, scleanup := startServers(t, numServers, math.MaxInt32) - defer scleanup() - - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name)) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}} - r.UpdateState(resolver.State{Addresses: addrs}) - // The init balancer is roundrobin. - if err := checkRoundRobin(cc, servers); err != nil { - t.Fatalf("check roundrobin returned non-nil error: %v", err) - } - // Switch to pickfirst. - cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil) - // Balancer is still roundrobin. - if err := checkRoundRobin(cc, servers); err != nil { - t.Fatalf("check roundrobin returned non-nil error: %v", err) - } -} - // First addr update contains grpclb. func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -275,7 +248,7 @@ func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) { func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -351,7 +324,7 @@ func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) { func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -413,7 +386,7 @@ func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) { func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -503,7 +476,7 @@ func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) { servers, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -550,7 +523,7 @@ func init() { // This test is to make sure this close doesn't cause a deadlock. func (s) TestSwitchBalancerOldRemoveSubConn(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r)) + cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r)) if err != nil { t.Fatalf("failed to dial: %v", err) } diff --git a/clientconn.go b/clientconn.go index c084a9013e4..dd12a14f09a 100644 --- a/clientconn.go +++ b/clientconn.go @@ -674,7 +674,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { } var balCfg serviceconfig.LoadBalancingConfig - if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil { + if cc.sc != nil && cc.sc.lbConfig != nil { balCfg = cc.sc.lbConfig.cfg } @@ -714,10 +714,6 @@ func (cc *ClientConn) switchBalancer(name string) { } channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name) - if cc.dopts.balancerBuilder != nil { - channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead") - return - } if cc.balancerWrapper != nil { // Don't hold cc.mu while closing the balancers. The balancers may call // methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex @@ -999,35 +995,28 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel cc.retryThrottler.Store((*retryThrottler)(nil)) } - if cc.dopts.balancerBuilder == nil { - // Only look at balancer types and switch balancer if balancer dial - // option is not set. - var newBalancerName string - if cc.sc != nil && cc.sc.lbConfig != nil { - newBalancerName = cc.sc.lbConfig.name - } else { - var isGRPCLB bool - for _, a := range addrs { - if a.Type == resolver.GRPCLB { - isGRPCLB = true - break - } - } - if isGRPCLB { - newBalancerName = grpclbName - } else if cc.sc != nil && cc.sc.LB != nil { - newBalancerName = *cc.sc.LB - } else { - newBalancerName = PickFirstBalancerName + // Only look at balancer types and switch balancer if balancer dial + // option is not set. + var newBalancerName string + if cc.sc != nil && cc.sc.lbConfig != nil { + newBalancerName = cc.sc.lbConfig.name + } else { + var isGRPCLB bool + for _, a := range addrs { + if a.Type == resolver.GRPCLB { + isGRPCLB = true + break } } - cc.switchBalancer(newBalancerName) - } else if cc.balancerWrapper == nil { - // Balancer dial option was set, and this is the first time handling - // resolved addresses. Build a balancer with dopts.balancerBuilder. - cc.curBalancerName = cc.dopts.balancerBuilder.Name() - cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) + if isGRPCLB { + newBalancerName = grpclbName + } else if cc.sc != nil && cc.sc.LB != nil { + newBalancerName = *cc.sc.LB + } else { + newBalancerName = PickFirstBalancerName + } } + cc.switchBalancer(newBalancerName) } func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { diff --git a/clientconn_state_transition_test.go b/clientconn_state_transition_test.go index 2090c8de689..0944e8434d7 100644 --- a/clientconn_state_transition_test.go +++ b/clientconn_state_transition_test.go @@ -20,6 +20,7 @@ package grpc import ( "context" + "fmt" "net" "sync" "testing" @@ -28,6 +29,7 @@ import ( "golang.org/x/net/http2" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -141,9 +143,6 @@ client enters TRANSIENT FAILURE.`, } func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - pl := testutils.NewPipeListener() defer pl.Close() @@ -156,10 +155,9 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s connMu.Unlock() }() - client, err := DialContext(ctx, - "", - WithInsecure(), - WithBalancerName(stateRecordingBalancerName), + client, err := Dial("", + WithTransportCredentials(insecure.NewCredentials()), + WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)), WithDialer(pl.Dialer()), withBackoff(noBackoff{}), withMinConnectDeadline(func() time.Duration { return time.Millisecond * 100 })) @@ -170,12 +168,9 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s go stayConnected(client) stateNotifications := testBalancerBuilder.nextStateNotifier() - - timeout := time.After(5 * time.Second) - for i := 0; i < len(want); i++ { select { - case <-timeout: + case <-time.After(defaultTestTimeout): t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) case seen := <-stateNotifications: if seen != want[i] { @@ -196,16 +191,6 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s // When a READY connection is closed, the client enters IDLE then CONNECTING. func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) { - want := []connectivity.State{ - connectivity.Connecting, - connectivity.Ready, - connectivity.Idle, - connectivity.Connecting, - } - - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Error while listening. Err: %v", err) @@ -237,7 +222,9 @@ func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) { conn.Close() }() - client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBalancerName(stateRecordingBalancerName)) + client, err := Dial(lis.Addr().String(), + WithTransportCredentials(insecure.NewCredentials()), + WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName))) if err != nil { t.Fatal(err) } @@ -246,11 +233,15 @@ func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) { stateNotifications := testBalancerBuilder.nextStateNotifier() - timeout := time.After(5 * time.Second) - + want := []connectivity.State{ + connectivity.Connecting, + connectivity.Ready, + connectivity.Idle, + connectivity.Connecting, + } for i := 0; i < len(want); i++ { select { - case <-timeout: + case <-time.After(defaultTestTimeout): t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) case seen := <-stateNotifications: if seen == connectivity.Ready { @@ -266,14 +257,6 @@ func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) { // When the first connection is closed, the client stays in CONNECTING until it // tries the second address (which succeeds, and then it enters READY). func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) { - want := []connectivity.State{ - connectivity.Connecting, - connectivity.Ready, - } - - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - lis1, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Error while listening. Err: %v", err) @@ -324,19 +307,25 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {Addr: lis1.Addr().String()}, {Addr: lis2.Addr().String()}, }}) - client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb)) + client, err := Dial("whatever:///this-gets-overwritten", + WithTransportCredentials(insecure.NewCredentials()), + WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)), + WithResolvers(rb)) if err != nil { t.Fatal(err) } defer client.Close() stateNotifications := testBalancerBuilder.nextStateNotifier() - - timeout := time.After(5 * time.Second) - + want := []connectivity.State{ + connectivity.Connecting, + connectivity.Ready, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() for i := 0; i < len(want); i++ { select { - case <-timeout: + case <-ctx.Done(): t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) case seen := <-stateNotifications: if seen != want[i] { @@ -345,12 +334,12 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) } } select { - case <-timeout: + case <-ctx.Done(): t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1") case <-server1Done: } select { - case <-timeout: + case <-ctx.Done(): t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2") case <-server2Done: } @@ -359,16 +348,6 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) // When there are multiple addresses, and we enter READY on one of them, a // later closure should cause the client to enter CONNECTING func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { - want := []connectivity.State{ - connectivity.Connecting, - connectivity.Ready, - connectivity.Idle, - connectivity.Connecting, - } - - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - lis1, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Error while listening. Err: %v", err) @@ -414,7 +393,10 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { {Addr: lis1.Addr().String()}, {Addr: lis2.Addr().String()}, }}) - client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb)) + client, err := Dial("whatever:///this-gets-overwritten", + WithTransportCredentials(insecure.NewCredentials()), + WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)), + WithResolvers(rb)) if err != nil { t.Fatal(err) } @@ -422,12 +404,17 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { go stayConnected(client) stateNotifications := testBalancerBuilder.nextStateNotifier() - - timeout := time.After(2 * time.Second) - + want := []connectivity.State{ + connectivity.Connecting, + connectivity.Ready, + connectivity.Idle, + connectivity.Connecting, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() for i := 0; i < len(want); i++ { select { - case <-timeout: + case <-ctx.Done(): t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want) case seen := <-stateNotifications: if seen == connectivity.Ready { @@ -439,7 +426,7 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { } } select { - case <-timeout: + case <-ctx.Done(): t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1") case <-server1Done: } diff --git a/clientconn_test.go b/clientconn_test.go index ee39370a87f..80547f51037 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/backoff" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" internalbackoff "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/transport" @@ -69,7 +70,7 @@ func (s) TestDialWithTimeout(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") r.InitialState(resolver.State{Addresses: []resolver.Address{lisAddr}}) - client, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithTimeout(5*time.Second)) + client, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithTimeout(5*time.Second)) close(dialDone) if err != nil { t.Fatalf("Dial failed. Err: %v", err) @@ -121,7 +122,7 @@ func (s) TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") r.InitialState(resolver.State{Addresses: []resolver.Address{lis1Addr, lis2Addr}}) - client, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r)) + client, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r)) if err != nil { t.Fatalf("Dial failed. Err: %v", err) } @@ -171,7 +172,7 @@ func (s) TestDialWaitsForServerSettings(t *testing.T) { }() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBlock()) + client, err := DialContext(ctx, lis.Addr().String(), WithTransportCredentials(insecure.NewCredentials()), WithBlock()) close(dialDone) if err != nil { t.Fatalf("Error while dialing. Err: %v", err) @@ -209,7 +210,7 @@ func (s) TestDialWaitsForServerSettingsAndFails(t *testing.T) { defer cancel() client, err := DialContext(ctx, lis.Addr().String(), - WithInsecure(), + WithTransportCredentials(insecure.NewCredentials()), WithReturnConnectionError(), withBackoff(noBackoff{}), withMinConnectDeadline(func() time.Duration { return time.Second / 4 })) @@ -286,7 +287,7 @@ func (s) TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) { break } }() - client, err := Dial(lis.Addr().String(), WithInsecure(), withMinConnectDeadline(func() time.Duration { return time.Millisecond * 500 })) + client, err := Dial(lis.Addr().String(), WithTransportCredentials(insecure.NewCredentials()), withMinConnectDeadline(func() time.Duration { return time.Millisecond * 500 })) if err != nil { t.Fatalf("Error while dialing. Err: %v", err) } @@ -342,7 +343,7 @@ func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { prevAt = meow } }() - cc, err := Dial(lis.Addr().String(), WithInsecure()) + cc, err := Dial(lis.Addr().String(), WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Fatalf("Error while dialing. Err: %v", err) } @@ -352,7 +353,10 @@ func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { } func (s) TestWithTimeout(t *testing.T) { - conn, err := Dial("passthrough:///Non-Existent.Server:80", WithTimeout(time.Millisecond), WithBlock(), WithInsecure()) + conn, err := Dial("passthrough:///Non-Existent.Server:80", + WithTimeout(time.Millisecond), + WithBlock(), + WithTransportCredentials(insecure.NewCredentials())) if err == nil { conn.Close() } @@ -439,8 +443,8 @@ func (s) TestDial_OneBackoffPerRetryGroup(t *testing.T) { {Addr: lis2.Addr().String()}, }}) client, err := DialContext(ctx, "whatever:///this-gets-overwritten", - WithInsecure(), - WithBalancerName(stateRecordingBalancerName), + WithTransportCredentials(insecure.NewCredentials()), + WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)), WithResolvers(rb), withMinConnectDeadline(getMinConnectTimeout)) if err != nil { @@ -466,7 +470,7 @@ func (s) TestDial_OneBackoffPerRetryGroup(t *testing.T) { func (s) TestDialContextCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - if _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure()); err != context.Canceled { + if _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithTransportCredentials(insecure.NewCredentials())); err != context.Canceled { t.Fatalf("DialContext(%v, _) = _, %v, want _, %v", ctx, err, context.Canceled) } } @@ -484,7 +488,7 @@ func (s) TestDialContextFailFast(t *testing.T) { return nil, failErr } - _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure(), WithDialer(dialer), FailOnNonTempDialError(true)) + _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithTransportCredentials(insecure.NewCredentials()), WithDialer(dialer), FailOnNonTempDialError(true)) if terr, ok := err.(transport.ConnectionError); !ok || terr.Origin() != failErr { t.Fatalf("DialContext() = _, %v, want _, %v", err, failErr) } @@ -529,7 +533,7 @@ func (s) TestCredentialsMisuse(t *testing.T) { // Use of perRPC creds requiring transport security over an insecure // transport must fail. - if _, err := Dial("passthrough:///Non-Existent.Server:80", WithPerRPCCredentials(securePerRPCCredentials{}), WithInsecure()); err != errTransportCredentialsMissing { + if _, err := Dial("passthrough:///Non-Existent.Server:80", WithPerRPCCredentials(securePerRPCCredentials{}), WithTransportCredentials(insecure.NewCredentials())); err != errTransportCredentialsMissing { t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errTransportCredentialsMissing) } @@ -573,7 +577,7 @@ func (s) TestWithConnectParams(t *testing.T) { } func testBackoffConfigSet(t *testing.T, wantBackoff internalbackoff.Exponential, opts ...DialOption) { - opts = append(opts, WithInsecure()) + opts = append(opts, WithTransportCredentials(insecure.NewCredentials())) conn, err := Dial("passthrough:///foo:80", opts...) if err != nil { t.Fatalf("unexpected error dialing connection: %v", err) @@ -597,7 +601,7 @@ func testBackoffConfigSet(t *testing.T, wantBackoff internalbackoff.Exponential, func (s) TestConnectParamsWithMinConnectTimeout(t *testing.T) { // Default value specified for minConnectTimeout in the spec is 20 seconds. mct := 1 * time.Minute - conn, err := Dial("passthrough:///foo:80", WithInsecure(), WithConnectParams(ConnectParams{MinConnectTimeout: mct})) + conn, err := Dial("passthrough:///foo:80", WithTransportCredentials(insecure.NewCredentials()), WithConnectParams(ConnectParams{MinConnectTimeout: mct})) if err != nil { t.Fatalf("unexpected error dialing connection: %v", err) } @@ -611,7 +615,7 @@ func (s) TestConnectParamsWithMinConnectTimeout(t *testing.T) { func (s) TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r)) + cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -628,7 +632,7 @@ func (s) TestResolverServiceConfigWhileClosingNotPanic(t *testing.T) { for i := 0; i < 10; i++ { // Run this multiple times to make sure it doesn't panic. r := manual.NewBuilderWithScheme(fmt.Sprintf("whatever-%d", i)) - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r)) + cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -641,7 +645,7 @@ func (s) TestResolverServiceConfigWhileClosingNotPanic(t *testing.T) { func (s) TestResolverEmptyUpdateNotPanic(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r)) + cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -691,7 +695,7 @@ func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) { addr := lis.Addr().String() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - cc, err := DialContext(ctx, addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{ + cc, err := DialContext(ctx, addr, WithBlock(), WithTransportCredentials(insecure.NewCredentials()), WithKeepaliveParams(keepalive.ClientParameters{ Time: 10 * time.Second, Timeout: 100 * time.Millisecond, PermitWithoutStream: true, @@ -720,7 +724,7 @@ func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) { func (s) TestDisableServiceConfigOption(t *testing.T) { r := manual.NewBuilderWithScheme("whatever") addr := r.Scheme() + ":///non.existent" - cc, err := Dial(addr, WithInsecure(), WithResolvers(r), WithDisableServiceConfig()) + cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithDisableServiceConfig()) if err != nil { t.Fatalf("Dial(%s, _) = _, %v, want _, ", addr, err) } @@ -747,7 +751,7 @@ func (s) TestDisableServiceConfigOption(t *testing.T) { func (s) TestMethodConfigDefaultService(t *testing.T) { addr := "nonexist:///non.existent" - cc, err := Dial(addr, WithInsecure(), WithDefaultServiceConfig(`{ + cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithDefaultServiceConfig(`{ "methodConfig": [{ "name": [ { @@ -770,7 +774,7 @@ func (s) TestMethodConfigDefaultService(t *testing.T) { func (s) TestGetClientConnTarget(t *testing.T) { addr := "nonexist:///non.existent" - cc, err := Dial(addr, WithInsecure()) + cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Fatalf("Dial(%s, _) = _, %v, want _, ", addr, err) } @@ -796,7 +800,7 @@ func (s) TestResetConnectBackoff(t *testing.T) { dials <- struct{}{} return nil, errors.New("failed to fake dial") } - cc, err := Dial("any", WithInsecure(), WithDialer(dialer), withBackoff(backoffForever{})) + cc, err := Dial("any", WithTransportCredentials(insecure.NewCredentials()), WithDialer(dialer), withBackoff(backoffForever{})) if err != nil { t.Fatalf("Dial() = _, %v; want _, nil", err) } @@ -825,7 +829,7 @@ func (s) TestResetConnectBackoff(t *testing.T) { func (s) TestBackoffCancel(t *testing.T) { dialStrCh := make(chan string) - cc, err := Dial("any", WithInsecure(), WithDialer(func(t string, _ time.Duration) (net.Conn, error) { + cc, err := Dial("any", WithTransportCredentials(insecure.NewCredentials()), WithDialer(func(t string, _ time.Duration) (net.Conn, error) { dialStrCh <- t return nil, fmt.Errorf("test dialer, always error") })) @@ -956,10 +960,10 @@ func (s) TestUpdateAddresses_RetryFromFirstAddr(t *testing.T) { rb.InitialState(resolver.State{Addresses: addrsList}) client, err := Dial("whatever:///this-gets-overwritten", - WithInsecure(), + WithTransportCredentials(insecure.NewCredentials()), WithResolvers(rb), withBackoff(noBackoff{}), - WithBalancerName(stateRecordingBalancerName), + WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)), withMinConnectDeadline(func() time.Duration { return time.Hour })) if err != nil { t.Fatal(err) @@ -1044,14 +1048,14 @@ func verifyWaitForReadyEqualsTrue(cc *ClientConn) bool { } func testInvalidDefaultServiceConfig(t *testing.T) { - _, err := Dial("fake.com", WithInsecure(), WithDefaultServiceConfig("")) + _, err := Dial("fake.com", WithTransportCredentials(insecure.NewCredentials()), WithDefaultServiceConfig("")) if !strings.Contains(err.Error(), invalidDefaultServiceConfigErrPrefix) { t.Fatalf("Dial got err: %v, want err contains: %v", err, invalidDefaultServiceConfigErrPrefix) } } func testDefaultServiceConfigWhenResolverServiceConfigDisabled(t *testing.T, r *manual.Resolver, addr string, js string) { - cc, err := Dial(addr, WithInsecure(), WithDisableServiceConfig(), WithResolvers(r), WithDefaultServiceConfig(js)) + cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithDisableServiceConfig(), WithResolvers(r), WithDefaultServiceConfig(js)) if err != nil { t.Fatalf("Dial(%s, _) = _, %v, want _, ", addr, err) } @@ -1067,7 +1071,7 @@ func testDefaultServiceConfigWhenResolverServiceConfigDisabled(t *testing.T, r * } func testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t *testing.T, r *manual.Resolver, addr string, js string) { - cc, err := Dial(addr, WithInsecure(), WithResolvers(r), WithDefaultServiceConfig(js)) + cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithDefaultServiceConfig(js)) if err != nil { t.Fatalf("Dial(%s, _) = _, %v, want _, ", addr, err) } @@ -1081,7 +1085,7 @@ func testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t *testing.T } func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T, r *manual.Resolver, addr string, js string) { - cc, err := Dial(addr, WithInsecure(), WithResolvers(r), WithDefaultServiceConfig(js)) + cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithDefaultServiceConfig(js)) if err != nil { t.Fatalf("Dial(%s, _) = _, %v, want _, ", addr, err) } diff --git a/dialoptions.go b/dialoptions.go index bdfc200e3bb..e7ac15ce4c7 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -20,12 +20,10 @@ package grpc import ( "context" - "fmt" "net" "time" "google.golang.org/grpc/backoff" - "google.golang.org/grpc/balancer" "google.golang.org/grpc/channelz" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -46,18 +44,16 @@ type dialOptions struct { chainUnaryInts []UnaryClientInterceptor chainStreamInts []StreamClientInterceptor - cp Compressor - dc Decompressor - bs internalbackoff.Strategy - block bool - returnLastError bool - timeout time.Duration - scChan <-chan ServiceConfig - authority string - copts transport.ConnectOptions - callOptions []CallOption - // This is used by WithBalancerName dial option. - balancerBuilder balancer.Builder + cp Compressor + dc Decompressor + bs internalbackoff.Strategy + block bool + returnLastError bool + timeout time.Duration + scChan <-chan ServiceConfig + authority string + copts transport.ConnectOptions + callOptions []CallOption channelzParentID *channelz.Identifier disableServiceConfig bool disableRetry bool @@ -196,25 +192,6 @@ func WithDecompressor(dc Decompressor) DialOption { }) } -// WithBalancerName sets the balancer that the ClientConn will be initialized -// with. Balancer registered with balancerName will be used. This function -// panics if no balancer was registered by balancerName. -// -// The balancer cannot be overridden by balancer option specified by service -// config. -// -// Deprecated: use WithDefaultServiceConfig and WithDisableServiceConfig -// instead. Will be removed in a future 1.x release. -func WithBalancerName(balancerName string) DialOption { - builder := balancer.Get(balancerName) - if builder == nil { - panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName)) - } - return newFuncDialOption(func(o *dialOptions) { - o.balancerBuilder = builder - }) -} - // WithServiceConfig returns a DialOption which has a channel to read the // service configuration. // diff --git a/test/balancer_test.go b/test/balancer_test.go index 8fe1db32658..113fbaceafb 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -153,7 +153,7 @@ func (s) TestCredsBundleFromBalancer(t *testing.T) { te := newTest(t, env{name: "creds-bundle", network: "tcp", balancer: ""}) te.tapHandle = authHandle te.customDialOptions = []grpc.DialOption{ - grpc.WithBalancerName(testBalancerName), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalancerName)), } creds, err := credentials.NewServerTLSFromFile(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem")) if err != nil { @@ -188,7 +188,7 @@ func testPickExtraMetadata(t *testing.T, e env) { ) te.customDialOptions = []grpc.DialOption{ - grpc.WithBalancerName(testBalancerName), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalancerName)), grpc.WithUserAgent(testUserAgent), } te.startServer(&testServer{security: e.security}) @@ -236,7 +236,7 @@ func testDoneInfo(t *testing.T, e env) { b := &testBalancer{} balancer.Register(b) te.customDialOptions = []grpc.DialOption{ - grpc.WithBalancerName(testBalancerName), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalancerName)), } te.userAgent = failAppUA te.startServer(&testServer{security: e.security}) @@ -315,7 +315,7 @@ func testDoneLoads(t *testing.T) { return &testpb.Empty{}, nil }, } - if err := ss.Start(nil, grpc.WithBalancerName(testBalancerName)); err != nil { + if err := ss.Start(nil, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalancerName))); err != nil { t.Fatalf("error starting testing server: %v", err) } defer ss.Stop() @@ -393,8 +393,10 @@ func (s) TestNonGRPCLBBalancerGetsNoGRPCLBAddress(t *testing.T) { b := newTestBalancerKeepAddresses() balancer.Register(b) - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r), - grpc.WithBalancerName(b.Name())) + cc, err := grpc.Dial(r.Scheme()+":///test.server", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, b.Name()))) if err != nil { t.Fatalf("failed to dial: %v", err) } diff --git a/test/end2end_test.go b/test/end2end_test.go index cdf0434a121..f5ca011a39c 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -812,7 +812,7 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) scheme = te.resolverScheme + ":///" } if te.e.balancer != "" { - opts = append(opts, grpc.WithBalancerName(te.e.balancer)) + opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, te.e.balancer))) } if te.clientInitialWindowSize > 0 { opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize)) diff --git a/test/healthcheck_test.go b/test/healthcheck_test.go index 247ffea7c3c..abff2f56c43 100644 --- a/test/healthcheck_test.go +++ b/test/healthcheck_test.go @@ -49,7 +49,7 @@ func newTestHealthServer() *testHealthServer { return newTestHealthServerWithWatchFunc(defaultWatchFunc) } -func newTestHealthServerWithWatchFunc(f func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error) *testHealthServer { +func newTestHealthServerWithWatchFunc(f healthWatchFunc) *testHealthServer { return &testHealthServer{ watchFunc: f, update: make(chan struct{}, 1), @@ -83,9 +83,11 @@ func defaultWatchFunc(s *testHealthServer, in *healthpb.HealthCheckRequest, stre return nil } +type healthWatchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error + type testHealthServer struct { healthpb.UnimplementedHealthServer - watchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error + watchFunc healthWatchFunc mu sync.Mutex status map[string]healthpb.HealthCheckResponse_ServingStatus update chan struct{} @@ -125,25 +127,26 @@ func setupHealthCheckWrapper() (hcEnterChan chan struct{}, hcExitChan chan struc return } -type svrConfig struct { - specialWatchFunc func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error -} +func setupServer(t *testing.T, watchFunc healthWatchFunc) (*grpc.Server, net.Listener, *testHealthServer) { + t.Helper() -func setupServer(sc *svrConfig) (s *grpc.Server, lis net.Listener, ts *testHealthServer, deferFunc func(), err error) { - s = grpc.NewServer() - lis, err = net.Listen("tcp", "localhost:0") + lis, err := net.Listen("tcp", "localhost:0") if err != nil { - return nil, nil, nil, func() {}, fmt.Errorf("failed to listen due to err %v", err) + t.Fatalf("net.Listen() failed: %v", err) } - if sc.specialWatchFunc != nil { - ts = newTestHealthServerWithWatchFunc(sc.specialWatchFunc) + + var ts *testHealthServer + if watchFunc != nil { + ts = newTestHealthServerWithWatchFunc(watchFunc) } else { ts = newTestHealthServer() } + s := grpc.NewServer() healthgrpc.RegisterHealthServer(s, ts) testpb.RegisterTestServiceServer(s, &testServer{}) go s.Serve(lis) - return s, lis, ts, s.Stop, nil + t.Cleanup(func() { s.Stop() }) + return s, lis, ts } type clientConfig struct { @@ -152,28 +155,34 @@ type clientConfig struct { extraDialOption []grpc.DialOption } -func setupClient(c *clientConfig) (cc *grpc.ClientConn, r *manual.Resolver, deferFunc func(), err error) { - r = manual.NewBuilderWithScheme("whatever") - var opts []grpc.DialOption - opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r), grpc.WithBalancerName(c.balancerName)) - if c.testHealthCheckFuncWrapper != nil { - opts = append(opts, internal.WithHealthCheckFunc.(func(internal.HealthChecker) grpc.DialOption)(c.testHealthCheckFuncWrapper)) +func setupClient(t *testing.T, c *clientConfig) (*grpc.ClientConn, *manual.Resolver) { + t.Helper() + + r := manual.NewBuilderWithScheme("whatever") + opts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + } + if c != nil { + if c.balancerName != "" { + opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, c.balancerName))) + } + if c.testHealthCheckFuncWrapper != nil { + opts = append(opts, internal.WithHealthCheckFunc.(func(internal.HealthChecker) grpc.DialOption)(c.testHealthCheckFuncWrapper)) + } + opts = append(opts, c.extraDialOption...) } - opts = append(opts, c.extraDialOption...) - cc, err = grpc.Dial(r.Scheme()+":///test.server", opts...) - if err != nil { - return nil, nil, nil, fmt.Errorf("dial failed due to err: %v", err) + cc, err := grpc.Dial(r.Scheme()+":///test.server", opts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) } - return cc, r, func() { cc.Close() }, nil + t.Cleanup(func() { cc.Close() }) + return cc, r } func (s) TestHealthCheckWatchStateChange(t *testing.T) { - _, lis, ts, deferFunc, err := setupServer(&svrConfig{}) - defer deferFunc() - if err != nil { - t.Fatal(err) - } + _, lis, ts := setupServer(t, nil) // The table below shows the expected series of addrConn connectivity transitions when server // updates its health status. As there's only one addrConn corresponds with the ClientConn in this @@ -189,20 +198,17 @@ func (s) TestHealthCheckWatchStateChange(t *testing.T) { //+------------------------------+-------------------------------------------+ ts.SetServingStatus("foo", healthpb.HealthCheckResponse_NOT_SERVING) - cc, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"}) - if err != nil { - t.Fatal(err) - } - defer deferFunc() - + cc, r := setupClient(t, nil) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseCfg(r, `{ "healthCheckConfig": { "serviceName": "foo" - } + }, + "loadBalancingConfig": [{"round_robin":{}}] }`)}) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok { t.Fatal("ClientConn is still in IDLE state when the context times out.") @@ -258,22 +264,18 @@ func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) { go s.Serve(lis) defer s.Stop() - cc, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"}) - if err != nil { - t.Fatal(err) - } - defer deferFunc() - + cc, r := setupClient(t, nil) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseCfg(r, `{ "healthCheckConfig": { "serviceName": "foo" - } + }, + "loadBalancingConfig": [{"round_robin":{}}] }`)}) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok { t.Fatal("ClientConn is still in IDLE state when the context times out.") } @@ -288,37 +290,23 @@ func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) { // In the case of a goaway received, the health check stream should be terminated and health check // function should exit. func (s) TestHealthCheckWithGoAway(t *testing.T) { - hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() - - s, lis, ts, deferFunc, err := setupServer(&svrConfig{}) - defer deferFunc() - if err != nil { - t.Fatal(err) - } - + s, lis, ts := setupServer(t, nil) ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) - cc, r, deferFunc, err := setupClient(&clientConfig{ - balancerName: "round_robin", - testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, - }) - if err != nil { - t.Fatal(err) - } - defer deferFunc() - + hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() + cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper}) tc := testpb.NewTestServiceClient(cc) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseCfg(r, `{ "healthCheckConfig": { "serviceName": "foo" - } + }, + "loadBalancingConfig": [{"round_robin":{}}] }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - // make some rpcs to make sure connection is working. if err := verifyResultWithDelay(func() (bool, error) { if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { @@ -380,33 +368,19 @@ func (s) TestHealthCheckWithGoAway(t *testing.T) { } func (s) TestHealthCheckWithConnClose(t *testing.T) { - hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() - - s, lis, ts, deferFunc, err := setupServer(&svrConfig{}) - defer deferFunc() - if err != nil { - t.Fatal(err) - } - + s, lis, ts := setupServer(t, nil) ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) - cc, r, deferFunc, err := setupClient(&clientConfig{ - balancerName: "round_robin", - testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, - }) - if err != nil { - t.Fatal(err) - } - defer deferFunc() - + hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() + cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper}) tc := testpb.NewTestServiceClient(cc) - r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseCfg(r, `{ "healthCheckConfig": { "serviceName": "foo" - } + }, + "loadBalancingConfig": [{"round_robin":{}}] }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -444,30 +418,17 @@ func (s) TestHealthCheckWithConnClose(t *testing.T) { // addrConn drain happens when addrConn gets torn down due to its address being no longer in the // address list returned by the resolver. func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) { - hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() - - _, lis, ts, deferFunc, err := setupServer(&svrConfig{}) - defer deferFunc() - if err != nil { - t.Fatal(err) - } - + _, lis, ts := setupServer(t, nil) ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) - cc, r, deferFunc, err := setupClient(&clientConfig{ - balancerName: "round_robin", - testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, - }) - if err != nil { - t.Fatal(err) - } - defer deferFunc() - + hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() + cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper}) tc := testpb.NewTestServiceClient(cc) sc := parseCfg(r, `{ "healthCheckConfig": { "serviceName": "foo" - } + }, + "loadBalancingConfig": [{"round_robin":{}}] }`) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, @@ -537,32 +498,19 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) { // ClientConn close will lead to its addrConns being torn down. func (s) TestHealthCheckWithClientConnClose(t *testing.T) { - hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() - - _, lis, ts, deferFunc, err := setupServer(&svrConfig{}) - defer deferFunc() - if err != nil { - t.Fatal(err) - } - + _, lis, ts := setupServer(t, nil) ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) - cc, r, deferFunc, err := setupClient(&clientConfig{ - balancerName: "round_robin", - testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, - }) - if err != nil { - t.Fatal(err) - } - defer deferFunc() - + hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() + cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper}) tc := testpb.NewTestServiceClient(cc) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseCfg(r, `{ "healthCheckConfig": { "serviceName": "foo" - } + }, + "loadBalancingConfig": [{"round_robin":{}}] }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -602,39 +550,25 @@ func (s) TestHealthCheckWithClientConnClose(t *testing.T) { // closes the skipReset channel(since it has not been closed inside health check func) to unblock // onGoAway/onClose goroutine. func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *testing.T) { - hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() - - _, lis, ts, deferFunc, err := setupServer(&svrConfig{ - specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { - if in.Service != "delay" { - return status.Error(codes.FailedPrecondition, - "this special Watch function only handles request with service name to be \"delay\"") - } - // Do nothing to mock a delay of health check response from server side. - // This case is to help with the test that covers the condition that setConnectivityState is not - // called inside HealthCheckFunc before the func returns. - select { - case <-stream.Context().Done(): - case <-time.After(5 * time.Second): - } - return nil - }, - }) - defer deferFunc() - if err != nil { - t.Fatal(err) + watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { + if in.Service != "delay" { + return status.Error(codes.FailedPrecondition, + "this special Watch function only handles request with service name to be \"delay\"") + } + // Do nothing to mock a delay of health check response from server side. + // This case is to help with the test that covers the condition that setConnectivityState is not + // called inside HealthCheckFunc before the func returns. + select { + case <-stream.Context().Done(): + case <-time.After(5 * time.Second): + } + return nil } - + _, lis, ts := setupServer(t, watchFunc) ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING) - _, r, deferFunc, err := setupClient(&clientConfig{ - balancerName: "round_robin", - testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, - }) - if err != nil { - t.Fatal(err) - } - defer deferFunc() + hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() + _, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper}) // The serviceName "delay" is specially handled at server side, where response will not be sent // back to client immediately upon receiving the request (client should receive no response until @@ -642,7 +576,8 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *tes sc := parseCfg(r, `{ "healthCheckConfig": { "serviceName": "delay" - } + }, + "loadBalancingConfig": [{"round_robin":{}}] }`) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, @@ -678,39 +613,25 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *tes // closes the allowedToReset channel(since it has not been closed inside health check func) to unblock // onGoAway/onClose goroutine. func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) { - hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() - - s, lis, ts, deferFunc, err := setupServer(&svrConfig{ - specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { - if in.Service != "delay" { - return status.Error(codes.FailedPrecondition, - "this special Watch function only handles request with service name to be \"delay\"") - } - // Do nothing to mock a delay of health check response from server side. - // This case is to help with the test that covers the condition that setConnectivityState is not - // called inside HealthCheckFunc before the func returns. - select { - case <-stream.Context().Done(): - case <-time.After(5 * time.Second): - } - return nil - }, - }) - defer deferFunc() - if err != nil { - t.Fatal(err) + watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { + if in.Service != "delay" { + return status.Error(codes.FailedPrecondition, + "this special Watch function only handles request with service name to be \"delay\"") + } + // Do nothing to mock a delay of health check response from server side. + // This case is to help with the test that covers the condition that setConnectivityState is not + // called inside HealthCheckFunc before the func returns. + select { + case <-stream.Context().Done(): + case <-time.After(5 * time.Second): + } + return nil } - + s, lis, ts := setupServer(t, watchFunc) ts.SetServingStatus("delay", healthpb.HealthCheckResponse_SERVING) - _, r, deferFunc, err := setupClient(&clientConfig{ - balancerName: "round_robin", - testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, - }) - if err != nil { - t.Fatal(err) - } - defer deferFunc() + hcEnterChan, hcExitChan, testHealthCheckFuncWrapper := setupHealthCheckWrapper() + _, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper}) // The serviceName "delay" is specially handled at server side, where response will not be sent // back to client immediately upon receiving the request (client should receive no response until @@ -720,7 +641,8 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) { ServiceConfig: parseCfg(r, `{ "healthCheckConfig": { "serviceName": "delay" - } + }, + "loadBalancingConfig": [{"round_robin":{}}] }`)}) select { @@ -750,25 +672,18 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalled(t *testing.T) { func testHealthCheckDisableWithDialOption(t *testing.T, addr string) { hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper() - - cc, r, deferFunc, err := setupClient(&clientConfig{ - balancerName: "round_robin", + cc, r := setupClient(t, &clientConfig{ testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, extraDialOption: []grpc.DialOption{grpc.WithDisableHealthCheck()}, }) - if err != nil { - t.Fatal(err) - } - defer deferFunc() - tc := testpb.NewTestServiceClient(cc) - r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: addr}}, ServiceConfig: parseCfg(r, `{ "healthCheckConfig": { "serviceName": "foo" - } + }, + "loadBalancingConfig": [{"round_robin":{}}] }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -792,24 +707,17 @@ func testHealthCheckDisableWithDialOption(t *testing.T, addr string) { func testHealthCheckDisableWithBalancer(t *testing.T, addr string) { hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper() - - cc, r, deferFunc, err := setupClient(&clientConfig{ - balancerName: "pick_first", + cc, r := setupClient(t, &clientConfig{ testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, }) - if err != nil { - t.Fatal(err) - } - defer deferFunc() - tc := testpb.NewTestServiceClient(cc) - r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: addr}}, ServiceConfig: parseCfg(r, `{ "healthCheckConfig": { "serviceName": "foo" - } + }, + "loadBalancingConfig": [{"pick_first":{}}] }`)}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -833,18 +741,8 @@ func testHealthCheckDisableWithBalancer(t *testing.T, addr string) { func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) { hcEnterChan, _, testHealthCheckFuncWrapper := setupHealthCheckWrapper() - - cc, r, deferFunc, err := setupClient(&clientConfig{ - balancerName: "round_robin", - testHealthCheckFuncWrapper: testHealthCheckFuncWrapper, - }) - if err != nil { - t.Fatal(err) - } - defer deferFunc() - + cc, r := setupClient(t, &clientConfig{testHealthCheckFuncWrapper: testHealthCheckFuncWrapper}) tc := testpb.NewTestServiceClient(cc) - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: addr}}}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -867,11 +765,7 @@ func testHealthCheckDisableWithServiceConfig(t *testing.T, addr string) { } func (s) TestHealthCheckDisable(t *testing.T) { - _, lis, ts, deferFunc, err := setupServer(&svrConfig{}) - defer deferFunc() - if err != nil { - t.Fatal(err) - } + _, lis, ts := setupServer(t, nil) ts.SetServingStatus("foo", healthpb.HealthCheckResponse_SERVING) // test client side disabling configuration. @@ -881,32 +775,23 @@ func (s) TestHealthCheckDisable(t *testing.T) { } func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) { - _, lis, _, deferFunc, err := setupServer(&svrConfig{ - specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { - if in.Service != "channelzSuccess" { - return status.Error(codes.FailedPrecondition, - "this special Watch function only handles request with service name to be \"channelzSuccess\"") - } - return status.Error(codes.OK, "fake success") - }, - }) - defer deferFunc() - if err != nil { - t.Fatal(err) - } - - _, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"}) - if err != nil { - t.Fatal(err) + watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { + if in.Service != "channelzSuccess" { + return status.Error(codes.FailedPrecondition, + "this special Watch function only handles request with service name to be \"channelzSuccess\"") + } + return status.Error(codes.OK, "fake success") } - defer deferFunc() + _, lis, _ := setupServer(t, watchFunc) + _, r := setupClient(t, nil) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseCfg(r, `{ "healthCheckConfig": { "serviceName": "channelzSuccess" - } + }, + "loadBalancingConfig": [{"round_robin":{}}] }`)}) if err := verifyResultWithDelay(func() (bool, error) { @@ -937,32 +822,23 @@ func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) { } func (s) TestHealthCheckChannelzCountingCallFailure(t *testing.T) { - _, lis, _, deferFunc, err := setupServer(&svrConfig{ - specialWatchFunc: func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { - if in.Service != "channelzFailure" { - return status.Error(codes.FailedPrecondition, - "this special Watch function only handles request with service name to be \"channelzFailure\"") - } - return status.Error(codes.Internal, "fake failure") - }, - }) - if err != nil { - t.Fatal(err) - } - defer deferFunc() - - _, r, deferFunc, err := setupClient(&clientConfig{balancerName: "round_robin"}) - if err != nil { - t.Fatal(err) + watchFunc := func(s *testHealthServer, in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error { + if in.Service != "channelzFailure" { + return status.Error(codes.FailedPrecondition, + "this special Watch function only handles request with service name to be \"channelzFailure\"") + } + return status.Error(codes.Internal, "fake failure") } - defer deferFunc() + _, lis, _ := setupServer(t, watchFunc) + _, r := setupClient(t, nil) r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, ServiceConfig: parseCfg(r, `{ "healthCheckConfig": { "serviceName": "channelzFailure" - } + }, + "loadBalancingConfig": [{"round_robin":{}}] }`)}) if err := verifyResultWithDelay(func() (bool, error) {