Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dial option to set balancer #1697

Merged
merged 3 commits into from Dec 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion balancer/roundrobin/roundrobin.go
Expand Up @@ -31,9 +31,12 @@ import (
"google.golang.org/grpc/resolver"
)

// Name is the name of round_robin balancer.
const Name = "round_robin"

// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder("round_robin", &rrPickerBuilder{})
return base.NewBalancerBuilder(Name, &rrPickerBuilder{})
}

func init() {
Expand Down
18 changes: 8 additions & 10 deletions balancer/roundrobin/roundrobin_test.go
Expand Up @@ -27,7 +27,7 @@ import (

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/peer"
Expand All @@ -37,8 +37,6 @@ import (
"google.golang.org/grpc/test/leakcheck"
)

var rr = balancer.Get("round_robin")

type testServer struct {
testpb.TestServiceServer
}
Expand Down Expand Up @@ -102,7 +100,7 @@ func TestOneBackend(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -134,7 +132,7 @@ func TestBackendsRoundRobin(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -193,7 +191,7 @@ func TestAddressesRemoved(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -235,7 +233,7 @@ func TestCloseWithPendingRPC(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -269,7 +267,7 @@ func TestNewAddressWhileBlocking(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -318,7 +316,7 @@ func TestOneServerDown(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr), grpc.WithWaitForHandshake())
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -416,7 +414,7 @@ func TestAllServersDown(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr), grpc.WithWaitForHandshake())
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down
37 changes: 33 additions & 4 deletions balancer_switching_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"golang.org/x/net/context"
"google.golang.org/grpc/balancer/roundrobin"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
Expand Down Expand Up @@ -132,6 +133,34 @@ func TestSwitchBalancer(t *testing.T) {
}
}

// Test that balancer specified by dial option will not be overridden.
func TestBalancerDialOption(t *testing.T) {
defer leakcheck.Check(t)
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()

numServers := 2
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
// The init balancer is roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.handleServiceConfig(`{"loadBalancingPolicy": "pick_first"}`)
// Balancer is still roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
}

// First addr update contains grpclb.
func TestSwitchBalancerGRPCLBFirst(t *testing.T) {
defer leakcheck.Check(t)
Expand Down Expand Up @@ -182,7 +211,7 @@ func TestSwitchBalancerGRPCLBFirst(t *testing.T) {
r.NewAddress([]resolver.Address{{Addr: "backend"}})
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == pickfirstName
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
Expand Down Expand Up @@ -210,7 +239,7 @@ func TestSwitchBalancerGRPCLBSecond(t *testing.T) {
var isPickFirst bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == pickfirstName
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
Expand Down Expand Up @@ -258,7 +287,7 @@ func TestSwitchBalancerGRPCLBSecond(t *testing.T) {
r.NewAddress([]resolver.Address{{Addr: "backend"}})
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == pickfirstName
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
Expand Down Expand Up @@ -352,7 +381,7 @@ func TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
var isPickFirst bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == pickfirstName
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
Expand Down
40 changes: 28 additions & 12 deletions clientconn.go
Expand Up @@ -95,7 +95,8 @@ type dialOptions struct {
scChan <-chan ServiceConfig
copts transport.ConnectOptions
callOptions []CallOption
// This is to support v1 balancer.
// This is used by v1 balancer dial option WithBalancer to support v1
// balancer, and also by WithBalancerName dial option.
balancerBuilder balancer.Builder
// This is to support grpclb.
resolverBuilder resolver.Builder
Expand Down Expand Up @@ -200,7 +201,8 @@ func WithDecompressor(dc Decompressor) DialOption {

// WithBalancer returns a DialOption which sets a load balancer with the v1 API.
// Name resolver will be ignored if this DialOption is specified.
// Deprecated: use the new balancer APIs in balancer package instead.
//
// Deprecated: use the new balancer APIs in balancer package and WithBalancerName.
func WithBalancer(b Balancer) DialOption {
return func(o *dialOptions) {
o.balancerBuilder = &balancerWrapperBuilder{
Expand All @@ -209,12 +211,21 @@ func WithBalancer(b Balancer) DialOption {
}
}

// WithBalancerBuilder is for testing only. Users using custom balancers should
// register their balancer and use service config to choose the balancer to use.
func WithBalancerBuilder(b balancer.Builder) DialOption {
// TODO(bar) remove this when switching balancer is done.
// WithBalancerName sets the balancer that the ClientConn will be initialized
// with. Balancer registered with balancerName will be used. This function
// panics if no balancer was registered by balancerName.
//
// The balancer cannot be overridden by balancer option specified by service
// config.
//
// This is an EXPERIMENTAL API.
func WithBalancerName(balancerName string) DialOption {
builder := balancer.Get(balancerName)
if builder == nil {
panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
}
return func(o *dialOptions) {
o.balancerBuilder = b
o.balancerBuilder = builder
}
}

Expand Down Expand Up @@ -670,9 +681,9 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {

cc.curAddresses = addrs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dfawley I updated the following if else while doing a rebase. Please take another look. Thanks!

if cc.dopts.balancerBuilder != nil && cc.balancerWrapper == nil {
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
} else {
if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var isGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
Expand All @@ -697,11 +708,16 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
// - the first time handling non-grpclb addresses
// (curBalancerName="grpclb", preBalancerName="")
if newBalancerName == "" {
newBalancerName = pickfirstName
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
} else if cc.balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
}

cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
}

Expand All @@ -724,7 +740,7 @@ func (cc *ClientConn) switchBalancer(name string) {

grpclog.Infof("ClientConn switching balancer to %q", name)
if cc.dopts.balancerBuilder != nil {
grpclog.Infoln("ignoring balancer switching: WithBalancer DialOption used instead")
grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
return
}
// TODO(bar switching) change this to two steps: drain and close.
Expand Down
4 changes: 4 additions & 0 deletions grpclb.go
Expand Up @@ -83,6 +83,10 @@ func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
return m, nil
}

func init() {
balancer.Register(newLBBuilder())
}

// newLBBuilder creates a builder for grpclb.
func newLBBuilder() balancer.Builder {
return NewLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
Expand Down
21 changes: 20 additions & 1 deletion grpclb/grpclb_test.go
Expand Up @@ -37,6 +37,7 @@ import (

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
lbmpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
Expand Down Expand Up @@ -574,6 +575,24 @@ func TestBalancerDisconnects(t *testing.T) {
t.Fatalf("No RPC sent to second backend after 1 second")
}

type customGRPCLBBuilder struct {
balancer.Builder
name string
}

func (b *customGRPCLBBuilder) Name() string {
return b.name
}

const grpclbCustomFallbackName = "grpclb_with_custom_fallback_timeout"

func init() {
balancer.Register(&customGRPCLBBuilder{
Builder: grpc.NewLBBuilderWithFallbackTimeout(100 * time.Millisecond),
name: grpclbCustomFallbackName,
})
}

func TestFallback(t *testing.T) {
defer leakcheck.Check(t)

Expand Down Expand Up @@ -612,7 +631,7 @@ func TestFallback(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithBalancerBuilder(grpc.NewLBBuilderWithFallbackTimeout(100*time.Millisecond)),
grpc.WithBalancerName(grpclbCustomFallbackName),
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down
2 changes: 1 addition & 1 deletion grpclb_remote_balancer.go
Expand Up @@ -241,7 +241,7 @@ func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
dopts = append(dopts, withContextDialer(lb.opt.Dialer))
}
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, WithBalancerBuilder(newPickfirstBuilder()))
dopts = append(dopts, WithBalancerName(PickFirstBalancerName))
dopts = append(dopts, withResolverBuilder(lb.manualResolver))
// Dial using manualResolver.Scheme, which is a random scheme generated
// when init grpclb. The target name is not important.
Expand Down
5 changes: 3 additions & 2 deletions pickfirst.go
Expand Up @@ -26,7 +26,8 @@ import (
"google.golang.org/grpc/resolver"
)

const pickfirstName = "pick_first"
// PickFirstBalancerName is the name of the pick_first balancer.
const PickFirstBalancerName = "pick_first"

func newPickfirstBuilder() balancer.Builder {
return &pickfirstBuilder{}
Expand All @@ -39,7 +40,7 @@ func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions
}

func (*pickfirstBuilder) Name() string {
return pickfirstName
return PickFirstBalancerName
}

type pickfirstBalancer struct {
Expand Down