Skip to content

Commit

Permalink
Add withbalancername dial option
Browse files Browse the repository at this point in the history
service config will not override dial option balancer
fix comments
Add const roundrobin.Name and PickFirstBalancerName.
remove WithBalancerBuilder
fix tests
delete withBalancerBuilder
  • Loading branch information
menghanl committed Dec 12, 2017
1 parent bf35f1b commit 1b7bf44
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 46 deletions.
5 changes: 4 additions & 1 deletion balancer/roundrobin/roundrobin.go
Expand Up @@ -31,9 +31,12 @@ import (
"google.golang.org/grpc/resolver"
)

// Name is the name of round_robin balancer.
const Name = "round_robin"

// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder("round_robin", &rrPickerBuilder{})
return base.NewBalancerBuilder(Name, &rrPickerBuilder{})
}

func init() {
Expand Down
18 changes: 8 additions & 10 deletions balancer/roundrobin/roundrobin_test.go
Expand Up @@ -27,7 +27,7 @@ import (

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/peer"
Expand All @@ -37,8 +37,6 @@ import (
"google.golang.org/grpc/test/leakcheck"
)

var rr = balancer.Get("round_robin")

type testServer struct {
testpb.TestServiceServer
}
Expand Down Expand Up @@ -102,7 +100,7 @@ func TestOneBackend(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -134,7 +132,7 @@ func TestBackendsRoundRobin(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -193,7 +191,7 @@ func TestAddressesRemoved(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -235,7 +233,7 @@ func TestCloseWithPendingRPC(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -269,7 +267,7 @@ func TestNewAddressWhileBlocking(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -318,7 +316,7 @@ func TestOneServerDown(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -415,7 +413,7 @@ func TestAllServersDown(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down
29 changes: 29 additions & 0 deletions balancer_switching_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"golang.org/x/net/context"
"google.golang.org/grpc/balancer/roundrobin"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
Expand Down Expand Up @@ -131,3 +132,31 @@ func TestSwitchBalancer(t *testing.T) {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
}

// Test that balancer specified by dial option will not be overridden.
func TestBalancerDialOption(t *testing.T) {
defer leakcheck.Check(t)
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()

numServers := 2
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
// The init balancer is roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.handleServiceConfig(`{"loadBalancingPolicy": "pick_first"}`)
// Balancer is still roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
}
31 changes: 21 additions & 10 deletions clientconn.go
Expand Up @@ -95,8 +95,10 @@ type dialOptions struct {
scChan <-chan ServiceConfig
copts transport.ConnectOptions
callOptions []CallOption
// This is to support v1 balancer.
// This is used by v1 balancer dial option WithBalancer to support v1 balancer.
balancerBuilder balancer.Builder
// The balancer to be used. Can not be overridden by service config.
balancerName string
// This is to support grpclb.
resolverBuilder resolver.Builder
waitForHandshake bool
Expand Down Expand Up @@ -198,7 +200,8 @@ func WithDecompressor(dc Decompressor) DialOption {

// WithBalancer returns a DialOption which sets a load balancer with the v1 API.
// Name resolver will be ignored if this DialOption is specified.
// Deprecated: use the new balancer APIs in balancer package instead.
//
// Deprecated: use the new balancer APIs in balancer package and WithBalancerName.
func WithBalancer(b Balancer) DialOption {
return func(o *dialOptions) {
o.balancerBuilder = &balancerWrapperBuilder{
Expand All @@ -207,12 +210,17 @@ func WithBalancer(b Balancer) DialOption {
}
}

// WithBalancerBuilder is for testing only. Users using custom balancers should
// register their balancer and use service config to choose the balancer to use.
func WithBalancerBuilder(b balancer.Builder) DialOption {
// TODO(bar) remove this when switching balancer is done.
// WithBalancerName sets the balancer that the ClientConn will be initialized
// with. Balancer registered with balancerName will be used. If no balancer was
// registered by balancerName, pick_first will be used.
//
// The balancer cannot be overridden by balancer option specified by service
// config.
//
// This is an EXPERIMENTAL API.
func WithBalancerName(balancerName string) DialOption {
return func(o *dialOptions) {
o.balancerBuilder = b
o.balancerName = balancerName
}
}

Expand Down Expand Up @@ -661,7 +669,10 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
if cc.balancerWrapper == nil {
// First time handling resolved addresses. Build a balancer use either
// the builder specified by dial option, or pickfirst.
builder := cc.dopts.balancerBuilder
builder := balancer.Get(cc.dopts.balancerName)
if builder == nil && cc.dopts.balancerBuilder != nil {
builder = cc.dopts.balancerBuilder
}
if builder == nil {
// No customBalancer was specified by DialOption, and this is the first
// time handling resolved addresses, create a pickfirst balancer.
Expand All @@ -684,8 +695,8 @@ func (cc *ClientConn) switchBalancer(name string) {
}
grpclog.Infof("ClientConn switching balancer to %q", name)

if cc.dopts.balancerBuilder != nil {
grpclog.Infoln("ignoring service config balancer configuration: WithBalancer DialOption used instead")
if cc.dopts.balancerName != "" || cc.dopts.balancerBuilder != nil {
grpclog.Infoln("ignoring service config balancer configuration: Balancer DialOption used instead")
return
}

Expand Down
9 changes: 6 additions & 3 deletions grpclb.go
Expand Up @@ -82,9 +82,12 @@ func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
return m, nil
}

// NewLBBuilder creates a builder for grpclb. For testing only.
func NewLBBuilder() balancer.Builder {
// TODO(bar grpclb) this function is exported for testing only, remove it when resolver supports selecting grpclb.
func init() {
balancer.Register(newLBBuilder())
}

// newLBBuilder creates a builder for grpclb. For testing only.
func newLBBuilder() balancer.Builder {
return NewLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
}

Expand Down
31 changes: 25 additions & 6 deletions grpclb/grpclb_test.go
Expand Up @@ -37,6 +37,7 @@ import (

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
lbmpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
Expand Down Expand Up @@ -344,7 +345,7 @@ func TestGRPCLB(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithBalancerBuilder(grpc.NewLBBuilder()),
grpc.WithBalancerName("grpclb"),
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down Expand Up @@ -396,7 +397,7 @@ func TestGRPCLBWeighted(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithBalancerBuilder(grpc.NewLBBuilder()),
grpc.WithBalancerName("grpclb"),
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down Expand Up @@ -462,7 +463,7 @@ func TestDropRequest(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithBalancerBuilder(grpc.NewLBBuilder()),
grpc.WithBalancerName("grpclb"),
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down Expand Up @@ -537,7 +538,7 @@ func TestBalancerDisconnects(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithBalancerBuilder(grpc.NewLBBuilder()),
grpc.WithBalancerName("grpclb"),
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down Expand Up @@ -578,6 +579,24 @@ func TestBalancerDisconnects(t *testing.T) {
t.Fatalf("No RPC sent to second backend after 1 second")
}

type customGRPCLBBuilder struct {
balancer.Builder
name string
}

func (b *customGRPCLBBuilder) Name() string {
return b.name
}

const grpclbCustomFallbackName = "grpclb_with_custom_fallback_timeout"

func init() {
balancer.Register(&customGRPCLBBuilder{
Builder: grpc.NewLBBuilderWithFallbackTimeout(100 * time.Millisecond),
name: grpclbCustomFallbackName,
})
}

func TestFallback(t *testing.T) {
defer leakcheck.Check(t)

Expand Down Expand Up @@ -616,7 +635,7 @@ func TestFallback(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithBalancerBuilder(grpc.NewLBBuilderWithFallbackTimeout(100*time.Millisecond)),
grpc.WithBalancerName(grpclbCustomFallbackName),
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down Expand Up @@ -710,7 +729,7 @@ func runAndGetStats(t *testing.T, dropForLoadBalancing, dropForRateLimiting bool
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithBalancerBuilder(grpc.NewLBBuilder()),
grpc.WithBalancerName("grpclb"),
grpc.WithTransportCredentials(&creds),
grpc.WithPerRPCCredentials(failPreRPCCred{}),
grpc.WithDialer(fakeNameDialer))
Expand Down
2 changes: 1 addition & 1 deletion grpclb_remote_balancer.go
Expand Up @@ -241,7 +241,7 @@ func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
dopts = append(dopts, withContextDialer(lb.opt.Dialer))
}
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, WithBalancerBuilder(newPickfirstBuilder()))
dopts = append(dopts, WithBalancerName(PickFirstBalancerName))
dopts = append(dopts, withResolverBuilder(lb.manualResolver))
// Dial using manualResolver.Scheme, which is a random scheme generated
// when init grpclb. The target name is not important.
Expand Down
5 changes: 4 additions & 1 deletion pickfirst.go
Expand Up @@ -26,6 +26,9 @@ import (
"google.golang.org/grpc/resolver"
)

// PickFirstBalancerName is the name of the pick_first balancer.
const PickFirstBalancerName = "pick_first"

func newPickfirstBuilder() balancer.Builder {
return &pickfirstBuilder{}
}
Expand All @@ -37,7 +40,7 @@ func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions
}

func (*pickfirstBuilder) Name() string {
return "pick_first"
return PickFirstBalancerName
}

type pickfirstBalancer struct {
Expand Down
14 changes: 7 additions & 7 deletions pickfirst_test.go
Expand Up @@ -40,7 +40,7 @@ func TestOneBackendPickfirst(t *testing.T) {
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestBackendsPickfirst(t *testing.T) {
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestNewAddressWhileBlockingPickfirst(t *testing.T) {
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestCloseWithPendingRPCPickfirst(t *testing.T) {
_, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestOneServerDownPickfirst(t *testing.T) {
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestAllServersDownPickfirst(t *testing.T) {
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestAddressesRemovedPickfirst(t *testing.T) {
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down

0 comments on commit 1b7bf44

Please sign in to comment.