Skip to content

Commit

Permalink
Add dial option to set balancer (#1697)
Browse files Browse the repository at this point in the history
WithBalancerName dial option specifies the name of the balancer to be used by the ClientConn. Service config updates can NOT override the balancer option.
  • Loading branch information
menghanl committed Dec 18, 2017
1 parent 6610f9a commit e6549e6
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 45 deletions.
5 changes: 4 additions & 1 deletion balancer/roundrobin/roundrobin.go
Expand Up @@ -31,9 +31,12 @@ import (
"google.golang.org/grpc/resolver"
)

// Name is the name of round_robin balancer.
const Name = "round_robin"

// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder("round_robin", &rrPickerBuilder{})
return base.NewBalancerBuilder(Name, &rrPickerBuilder{})
}

func init() {
Expand Down
18 changes: 8 additions & 10 deletions balancer/roundrobin/roundrobin_test.go
Expand Up @@ -27,7 +27,7 @@ import (

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/peer"
Expand All @@ -38,8 +38,6 @@ import (
"google.golang.org/grpc/test/leakcheck"
)

var rr = balancer.Get("round_robin")

type testServer struct {
testpb.TestServiceServer
}
Expand Down Expand Up @@ -103,7 +101,7 @@ func TestOneBackend(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -135,7 +133,7 @@ func TestBackendsRoundRobin(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -194,7 +192,7 @@ func TestAddressesRemoved(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -236,7 +234,7 @@ func TestCloseWithPendingRPC(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -270,7 +268,7 @@ func TestNewAddressWhileBlocking(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr))
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -319,7 +317,7 @@ func TestOneServerDown(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr), grpc.WithWaitForHandshake())
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -417,7 +415,7 @@ func TestAllServersDown(t *testing.T) {
}
defer test.cleanup()

cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr), grpc.WithWaitForHandshake())
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake())
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down
37 changes: 33 additions & 4 deletions balancer_switching_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"golang.org/x/net/context"
"google.golang.org/grpc/balancer/roundrobin"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
Expand Down Expand Up @@ -132,6 +133,34 @@ func TestSwitchBalancer(t *testing.T) {
}
}

// Test that balancer specified by dial option will not be overridden.
func TestBalancerDialOption(t *testing.T) {
defer leakcheck.Check(t)
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()

numServers := 2
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}})
// The init balancer is roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.handleServiceConfig(`{"loadBalancingPolicy": "pick_first"}`)
// Balancer is still roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
}

// First addr update contains grpclb.
func TestSwitchBalancerGRPCLBFirst(t *testing.T) {
defer leakcheck.Check(t)
Expand Down Expand Up @@ -182,7 +211,7 @@ func TestSwitchBalancerGRPCLBFirst(t *testing.T) {
r.NewAddress([]resolver.Address{{Addr: "backend"}})
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == pickfirstName
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
Expand Down Expand Up @@ -210,7 +239,7 @@ func TestSwitchBalancerGRPCLBSecond(t *testing.T) {
var isPickFirst bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == pickfirstName
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
Expand Down Expand Up @@ -258,7 +287,7 @@ func TestSwitchBalancerGRPCLBSecond(t *testing.T) {
r.NewAddress([]resolver.Address{{Addr: "backend"}})
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == pickfirstName
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
Expand Down Expand Up @@ -352,7 +381,7 @@ func TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
var isPickFirst bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == pickfirstName
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
Expand Down
40 changes: 28 additions & 12 deletions clientconn.go
Expand Up @@ -95,7 +95,8 @@ type dialOptions struct {
scChan <-chan ServiceConfig
copts transport.ConnectOptions
callOptions []CallOption
// This is to support v1 balancer.
// This is used by v1 balancer dial option WithBalancer to support v1
// balancer, and also by WithBalancerName dial option.
balancerBuilder balancer.Builder
// This is to support grpclb.
resolverBuilder resolver.Builder
Expand Down Expand Up @@ -200,7 +201,8 @@ func WithDecompressor(dc Decompressor) DialOption {

// WithBalancer returns a DialOption which sets a load balancer with the v1 API.
// Name resolver will be ignored if this DialOption is specified.
// Deprecated: use the new balancer APIs in balancer package instead.
//
// Deprecated: use the new balancer APIs in balancer package and WithBalancerName.
func WithBalancer(b Balancer) DialOption {
return func(o *dialOptions) {
o.balancerBuilder = &balancerWrapperBuilder{
Expand All @@ -209,12 +211,21 @@ func WithBalancer(b Balancer) DialOption {
}
}

// WithBalancerBuilder is for testing only. Users using custom balancers should
// register their balancer and use service config to choose the balancer to use.
func WithBalancerBuilder(b balancer.Builder) DialOption {
// TODO(bar) remove this when switching balancer is done.
// WithBalancerName sets the balancer that the ClientConn will be initialized
// with. Balancer registered with balancerName will be used. This function
// panics if no balancer was registered by balancerName.
//
// The balancer cannot be overridden by balancer option specified by service
// config.
//
// This is an EXPERIMENTAL API.
func WithBalancerName(balancerName string) DialOption {
builder := balancer.Get(balancerName)
if builder == nil {
panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
}
return func(o *dialOptions) {
o.balancerBuilder = b
o.balancerBuilder = builder
}
}

Expand Down Expand Up @@ -670,9 +681,9 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {

cc.curAddresses = addrs

if cc.dopts.balancerBuilder != nil && cc.balancerWrapper == nil {
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
} else {
if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var isGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
Expand All @@ -697,11 +708,16 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
// - the first time handling non-grpclb addresses
// (curBalancerName="grpclb", preBalancerName="")
if newBalancerName == "" {
newBalancerName = pickfirstName
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
} else if cc.balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
}

cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
}

Expand All @@ -724,7 +740,7 @@ func (cc *ClientConn) switchBalancer(name string) {

grpclog.Infof("ClientConn switching balancer to %q", name)
if cc.dopts.balancerBuilder != nil {
grpclog.Infoln("ignoring balancer switching: WithBalancer DialOption used instead")
grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
return
}
// TODO(bar switching) change this to two steps: drain and close.
Expand Down
4 changes: 4 additions & 0 deletions grpclb.go
Expand Up @@ -83,6 +83,10 @@ func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
return m, nil
}

func init() {
balancer.Register(newLBBuilder())
}

// newLBBuilder creates a builder for grpclb.
func newLBBuilder() balancer.Builder {
return NewLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
Expand Down
21 changes: 20 additions & 1 deletion grpclb/grpclb_test.go
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
lbmpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
Expand Down Expand Up @@ -573,6 +574,24 @@ func TestBalancerDisconnects(t *testing.T) {
t.Fatalf("No RPC sent to second backend after 1 second")
}

type customGRPCLBBuilder struct {
balancer.Builder
name string
}

func (b *customGRPCLBBuilder) Name() string {
return b.name
}

const grpclbCustomFallbackName = "grpclb_with_custom_fallback_timeout"

func init() {
balancer.Register(&customGRPCLBBuilder{
Builder: grpc.NewLBBuilderWithFallbackTimeout(100 * time.Millisecond),
name: grpclbCustomFallbackName,
})
}

func TestFallback(t *testing.T) {
defer leakcheck.Check(t)

Expand Down Expand Up @@ -611,7 +630,7 @@ func TestFallback(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithBalancerBuilder(grpc.NewLBBuilderWithFallbackTimeout(100*time.Millisecond)),
grpc.WithBalancerName(grpclbCustomFallbackName),
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down
2 changes: 1 addition & 1 deletion grpclb_remote_balancer.go
Expand Up @@ -241,7 +241,7 @@ func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
dopts = append(dopts, withContextDialer(lb.opt.Dialer))
}
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, WithBalancerBuilder(newPickfirstBuilder()))
dopts = append(dopts, WithBalancerName(PickFirstBalancerName))
dopts = append(dopts, withResolverBuilder(lb.manualResolver))
// Dial using manualResolver.Scheme, which is a random scheme generated
// when init grpclb. The target name is not important.
Expand Down
5 changes: 3 additions & 2 deletions pickfirst.go
Expand Up @@ -26,7 +26,8 @@ import (
"google.golang.org/grpc/resolver"
)

const pickfirstName = "pick_first"
// PickFirstBalancerName is the name of the pick_first balancer.
const PickFirstBalancerName = "pick_first"

func newPickfirstBuilder() balancer.Builder {
return &pickfirstBuilder{}
Expand All @@ -39,7 +40,7 @@ func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions
}

func (*pickfirstBuilder) Name() string {
return pickfirstName
return PickFirstBalancerName
}

type pickfirstBalancer struct {
Expand Down

0 comments on commit e6549e6

Please sign in to comment.