Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: delete deprecated API WithBalancerName() #5232

Merged
merged 1 commit into from Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 8 additions & 35 deletions balancer_switching_test.go
Expand Up @@ -26,7 +26,7 @@ import (
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -161,7 +161,7 @@ func (s) TestSwitchBalancer(t *testing.T) {
servers, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand All @@ -184,38 +184,11 @@ func (s) TestSwitchBalancer(t *testing.T) {
}
}

// Test that balancer specified by dial option will not be overridden.
func (s) TestBalancerDialOption(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")

const numServers = 2
servers, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
r.UpdateState(resolver.State{Addresses: addrs})
// The init balancer is roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil)
// Balancer is still roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
}

// First addr update contains grpclb.
func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -275,7 +248,7 @@ func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -351,7 +324,7 @@ func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -413,7 +386,7 @@ func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -503,7 +476,7 @@ func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
servers, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -550,7 +523,7 @@ func init() {
// This test is to make sure this close doesn't cause a deadlock.
func (s) TestSwitchBalancerOldRemoveSubConn(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down
51 changes: 20 additions & 31 deletions clientconn.go
Expand Up @@ -674,7 +674,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
}

var balCfg serviceconfig.LoadBalancingConfig
if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
if cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
}

Expand Down Expand Up @@ -714,10 +714,6 @@ func (cc *ClientConn) switchBalancer(name string) {
}

channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
if cc.dopts.balancerBuilder != nil {
channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
return
}
if cc.balancerWrapper != nil {
// Don't hold cc.mu while closing the balancers. The balancers may call
// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
Expand Down Expand Up @@ -999,35 +995,28 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
cc.retryThrottler.Store((*retryThrottler)(nil))
}

if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
} else {
var isGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
newBalancerName = PickFirstBalancerName
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
} else {
var isGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
cc.switchBalancer(newBalancerName)
} else if cc.balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
cc.curBalancerName = cc.dopts.balancerBuilder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
}

func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
Expand Down
99 changes: 43 additions & 56 deletions clientconn_state_transition_test.go
Expand Up @@ -20,6 +20,7 @@ package grpc

import (
"context"
"fmt"
"net"
"sync"
"testing"
Expand All @@ -28,6 +29,7 @@ import (
"golang.org/x/net/http2"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
Expand Down Expand Up @@ -141,9 +143,6 @@ client enters TRANSIENT FAILURE.`,
}

func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

pl := testutils.NewPipeListener()
defer pl.Close()

Expand All @@ -156,10 +155,9 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s
connMu.Unlock()
}()

client, err := DialContext(ctx,
"",
WithInsecure(),
WithBalancerName(stateRecordingBalancerName),
client, err := Dial("",
WithTransportCredentials(insecure.NewCredentials()),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
WithDialer(pl.Dialer()),
withBackoff(noBackoff{}),
withMinConnectDeadline(func() time.Duration { return time.Millisecond * 100 }))
Expand All @@ -170,12 +168,9 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s
go stayConnected(client)

stateNotifications := testBalancerBuilder.nextStateNotifier()

timeout := time.After(5 * time.Second)

for i := 0; i < len(want); i++ {
select {
case <-timeout:
case <-time.After(defaultTestTimeout):
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen != want[i] {
Expand All @@ -196,16 +191,6 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s

// When a READY connection is closed, the client enters IDLE then CONNECTING.
func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
Expand Down Expand Up @@ -237,7 +222,9 @@ func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
conn.Close()
}()

client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBalancerName(stateRecordingBalancerName))
client, err := Dial(lis.Addr().String(),
WithTransportCredentials(insecure.NewCredentials()),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)))
if err != nil {
t.Fatal(err)
}
Expand All @@ -246,11 +233,15 @@ func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {

stateNotifications := testBalancerBuilder.nextStateNotifier()

timeout := time.After(5 * time.Second)

want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
}
for i := 0; i < len(want); i++ {
select {
case <-timeout:
case <-time.After(defaultTestTimeout):
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen == connectivity.Ready {
Expand All @@ -266,14 +257,6 @@ func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
// When the first connection is closed, the client stays in CONNECTING until it
// tries the second address (which succeeds, and then it enters READY).
func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

lis1, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
Expand Down Expand Up @@ -324,19 +307,25 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
}})
client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb))
client, err := Dial("whatever:///this-gets-overwritten",
WithTransportCredentials(insecure.NewCredentials()),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
WithResolvers(rb))
if err != nil {
t.Fatal(err)
}
defer client.Close()

stateNotifications := testBalancerBuilder.nextStateNotifier()

timeout := time.After(5 * time.Second)

want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i := 0; i < len(want); i++ {
select {
case <-timeout:
case <-ctx.Done():
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen != want[i] {
Expand All @@ -345,12 +334,12 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
}
}
select {
case <-timeout:
case <-ctx.Done():
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
case <-server1Done:
}
select {
case <-timeout:
case <-ctx.Done():
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2")
case <-server2Done:
}
Expand All @@ -359,16 +348,6 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
// When there are multiple addresses, and we enter READY on one of them, a
// later closure should cause the client to enter CONNECTING
func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

lis1, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
Expand Down Expand Up @@ -414,20 +393,28 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
}})
client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb))
client, err := Dial("whatever:///this-gets-overwritten",
WithTransportCredentials(insecure.NewCredentials()),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
WithResolvers(rb))
if err != nil {
t.Fatal(err)
}
defer client.Close()
go stayConnected(client)

stateNotifications := testBalancerBuilder.nextStateNotifier()

timeout := time.After(2 * time.Second)

want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i := 0; i < len(want); i++ {
select {
case <-timeout:
case <-ctx.Done():
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen == connectivity.Ready {
Expand All @@ -439,7 +426,7 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
}
}
select {
case <-timeout:
case <-ctx.Done():
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
case <-server1Done:
}
Expand Down