From e6549e636da11b0ca0379af8bdb7d6d22ea80893 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 18 Dec 2017 15:35:42 -0800 Subject: [PATCH] Add dial option to set balancer (#1697) WithBalancerName dial option specifies the name of the balancer to be used by the ClientConn. Service config updates can NOT override the balancer option. --- balancer/roundrobin/roundrobin.go | 5 +++- balancer/roundrobin/roundrobin_test.go | 18 ++++++------ balancer_switching_test.go | 37 +++++++++++++++++++++--- clientconn.go | 40 ++++++++++++++++++-------- grpclb.go | 4 +++ grpclb/grpclb_test.go | 21 +++++++++++++- grpclb_remote_balancer.go | 2 +- pickfirst.go | 5 ++-- pickfirst_test.go | 14 ++++----- test/end2end_test.go | 9 ++---- 10 files changed, 110 insertions(+), 45 deletions(-) diff --git a/balancer/roundrobin/roundrobin.go b/balancer/roundrobin/roundrobin.go index 8ef14894801..2eda0a1c210 100644 --- a/balancer/roundrobin/roundrobin.go +++ b/balancer/roundrobin/roundrobin.go @@ -31,9 +31,12 @@ import ( "google.golang.org/grpc/resolver" ) +// Name is the name of round_robin balancer. +const Name = "round_robin" + // newBuilder creates a new roundrobin balancer builder. func newBuilder() balancer.Builder { - return base.NewBalancerBuilder("round_robin", &rrPickerBuilder{}) + return base.NewBalancerBuilder(Name, &rrPickerBuilder{}) } func init() { diff --git a/balancer/roundrobin/roundrobin_test.go b/balancer/roundrobin/roundrobin_test.go index f1b8c4b3193..59cac4b1bc3 100644 --- a/balancer/roundrobin/roundrobin_test.go +++ b/balancer/roundrobin/roundrobin_test.go @@ -27,7 +27,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" - "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" _ "google.golang.org/grpc/grpclog/glogger" "google.golang.org/grpc/peer" @@ -38,8 +38,6 @@ import ( "google.golang.org/grpc/test/leakcheck" ) -var rr = balancer.Get("round_robin") - type testServer struct { testpb.TestServiceServer } @@ -103,7 +101,7 @@ func TestOneBackend(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -135,7 +133,7 @@ func TestBackendsRoundRobin(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -194,7 +192,7 @@ func TestAddressesRemoved(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -236,7 +234,7 @@ func TestCloseWithPendingRPC(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -270,7 +268,7 @@ func TestNewAddressWhileBlocking(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr)) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name)) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -319,7 +317,7 @@ func TestOneServerDown(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr), grpc.WithWaitForHandshake()) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake()) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -417,7 +415,7 @@ func TestAllServersDown(t *testing.T) { } defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(rr), grpc.WithWaitForHandshake()) + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name), grpc.WithWaitForHandshake()) if err != nil { t.Fatalf("failed to dial: %v", err) } diff --git a/balancer_switching_test.go b/balancer_switching_test.go index 5274cd1f702..0d8b2a54583 100644 --- a/balancer_switching_test.go +++ b/balancer_switching_test.go @@ -25,6 +25,7 @@ import ( "time" "golang.org/x/net/context" + "google.golang.org/grpc/balancer/roundrobin" _ "google.golang.org/grpc/grpclog/glogger" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -132,6 +133,34 @@ func TestSwitchBalancer(t *testing.T) { } } +// Test that balancer specified by dial option will not be overridden. +func TestBalancerDialOption(t *testing.T) { + defer leakcheck.Check(t) + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() + + numServers := 2 + servers, _, scleanup := startServers(t, numServers, math.MaxInt32) + defer scleanup() + + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name)) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer cc.Close() + r.NewAddress([]resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}) + // The init balancer is roundrobin. + if err := checkRoundRobin(cc, servers); err != nil { + t.Fatalf("check roundrobin returned non-nil error: %v", err) + } + // Switch to pickfirst. + cc.handleServiceConfig(`{"loadBalancingPolicy": "pick_first"}`) + // Balancer is still roundrobin. + if err := checkRoundRobin(cc, servers); err != nil { + t.Fatalf("check roundrobin returned non-nil error: %v", err) + } +} + // First addr update contains grpclb. func TestSwitchBalancerGRPCLBFirst(t *testing.T) { defer leakcheck.Check(t) @@ -182,7 +211,7 @@ func TestSwitchBalancerGRPCLBFirst(t *testing.T) { r.NewAddress([]resolver.Address{{Addr: "backend"}}) for i := 0; i < 5000; i++ { cc.mu.Lock() - isPickFirst = cc.curBalancerName == pickfirstName + isPickFirst = cc.curBalancerName == PickFirstBalancerName cc.mu.Unlock() if isPickFirst { break @@ -210,7 +239,7 @@ func TestSwitchBalancerGRPCLBSecond(t *testing.T) { var isPickFirst bool for i := 0; i < 5000; i++ { cc.mu.Lock() - isPickFirst = cc.curBalancerName == pickfirstName + isPickFirst = cc.curBalancerName == PickFirstBalancerName cc.mu.Unlock() if isPickFirst { break @@ -258,7 +287,7 @@ func TestSwitchBalancerGRPCLBSecond(t *testing.T) { r.NewAddress([]resolver.Address{{Addr: "backend"}}) for i := 0; i < 5000; i++ { cc.mu.Lock() - isPickFirst = cc.curBalancerName == pickfirstName + isPickFirst = cc.curBalancerName == PickFirstBalancerName cc.mu.Unlock() if isPickFirst { break @@ -352,7 +381,7 @@ func TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) { var isPickFirst bool for i := 0; i < 5000; i++ { cc.mu.Lock() - isPickFirst = cc.curBalancerName == pickfirstName + isPickFirst = cc.curBalancerName == PickFirstBalancerName cc.mu.Unlock() if isPickFirst { break diff --git a/clientconn.go b/clientconn.go index a2dd20f9a6d..1d079eb2585 100644 --- a/clientconn.go +++ b/clientconn.go @@ -95,7 +95,8 @@ type dialOptions struct { scChan <-chan ServiceConfig copts transport.ConnectOptions callOptions []CallOption - // This is to support v1 balancer. + // This is used by v1 balancer dial option WithBalancer to support v1 + // balancer, and also by WithBalancerName dial option. balancerBuilder balancer.Builder // This is to support grpclb. resolverBuilder resolver.Builder @@ -200,7 +201,8 @@ func WithDecompressor(dc Decompressor) DialOption { // WithBalancer returns a DialOption which sets a load balancer with the v1 API. // Name resolver will be ignored if this DialOption is specified. -// Deprecated: use the new balancer APIs in balancer package instead. +// +// Deprecated: use the new balancer APIs in balancer package and WithBalancerName. func WithBalancer(b Balancer) DialOption { return func(o *dialOptions) { o.balancerBuilder = &balancerWrapperBuilder{ @@ -209,12 +211,21 @@ func WithBalancer(b Balancer) DialOption { } } -// WithBalancerBuilder is for testing only. Users using custom balancers should -// register their balancer and use service config to choose the balancer to use. -func WithBalancerBuilder(b balancer.Builder) DialOption { - // TODO(bar) remove this when switching balancer is done. +// WithBalancerName sets the balancer that the ClientConn will be initialized +// with. Balancer registered with balancerName will be used. This function +// panics if no balancer was registered by balancerName. +// +// The balancer cannot be overridden by balancer option specified by service +// config. +// +// This is an EXPERIMENTAL API. +func WithBalancerName(balancerName string) DialOption { + builder := balancer.Get(balancerName) + if builder == nil { + panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName)) + } return func(o *dialOptions) { - o.balancerBuilder = b + o.balancerBuilder = builder } } @@ -670,9 +681,9 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) { cc.curAddresses = addrs - if cc.dopts.balancerBuilder != nil && cc.balancerWrapper == nil { - cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) - } else { + if cc.dopts.balancerBuilder == nil { + // Only look at balancer types and switch balancer if balancer dial + // option is not set. var isGRPCLB bool for _, a := range addrs { if a.Type == resolver.GRPCLB { @@ -697,11 +708,16 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) { // - the first time handling non-grpclb addresses // (curBalancerName="grpclb", preBalancerName="") if newBalancerName == "" { - newBalancerName = pickfirstName + newBalancerName = PickFirstBalancerName } } cc.switchBalancer(newBalancerName) + } else if cc.balancerWrapper == nil { + // Balancer dial option was set, and this is the first time handling + // resolved addresses. Build a balancer with dopts.balancerBuilder. + cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) } + cc.balancerWrapper.handleResolvedAddrs(addrs, nil) } @@ -724,7 +740,7 @@ func (cc *ClientConn) switchBalancer(name string) { grpclog.Infof("ClientConn switching balancer to %q", name) if cc.dopts.balancerBuilder != nil { - grpclog.Infoln("ignoring balancer switching: WithBalancer DialOption used instead") + grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead") return } // TODO(bar switching) change this to two steps: drain and close. diff --git a/grpclb.go b/grpclb.go index 0750fc1ecea..90564f4e675 100644 --- a/grpclb.go +++ b/grpclb.go @@ -83,6 +83,10 @@ func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) { return m, nil } +func init() { + balancer.Register(newLBBuilder()) +} + // newLBBuilder creates a builder for grpclb. func newLBBuilder() balancer.Builder { return NewLBBuilderWithFallbackTimeout(defaultFallbackTimeout) diff --git a/grpclb/grpclb_test.go b/grpclb/grpclb_test.go index f09b0063a64..d83ea6b8817 100644 --- a/grpclb/grpclb_test.go +++ b/grpclb/grpclb_test.go @@ -36,6 +36,7 @@ import ( "github.com/golang/protobuf/proto" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" lbmpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages" @@ -573,6 +574,24 @@ func TestBalancerDisconnects(t *testing.T) { t.Fatalf("No RPC sent to second backend after 1 second") } +type customGRPCLBBuilder struct { + balancer.Builder + name string +} + +func (b *customGRPCLBBuilder) Name() string { + return b.name +} + +const grpclbCustomFallbackName = "grpclb_with_custom_fallback_timeout" + +func init() { + balancer.Register(&customGRPCLBBuilder{ + Builder: grpc.NewLBBuilderWithFallbackTimeout(100 * time.Millisecond), + name: grpclbCustomFallbackName, + }) +} + func TestFallback(t *testing.T) { defer leakcheck.Check(t) @@ -611,7 +630,7 @@ func TestFallback(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, - grpc.WithBalancerBuilder(grpc.NewLBBuilderWithFallbackTimeout(100*time.Millisecond)), + grpc.WithBalancerName(grpclbCustomFallbackName), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer)) if err != nil { t.Fatalf("Failed to dial to the backend %v", err) diff --git a/grpclb_remote_balancer.go b/grpclb_remote_balancer.go index 50d4ccd3a3a..1b580df26dd 100644 --- a/grpclb_remote_balancer.go +++ b/grpclb_remote_balancer.go @@ -241,7 +241,7 @@ func (lb *lbBalancer) dialRemoteLB(remoteLBName string) { dopts = append(dopts, withContextDialer(lb.opt.Dialer)) } // Explicitly set pickfirst as the balancer. - dopts = append(dopts, WithBalancerBuilder(newPickfirstBuilder())) + dopts = append(dopts, WithBalancerName(PickFirstBalancerName)) dopts = append(dopts, withResolverBuilder(lb.manualResolver)) // Dial using manualResolver.Scheme, which is a random scheme generated // when init grpclb. The target name is not important. diff --git a/pickfirst.go b/pickfirst.go index 69623c4b1c8..bf659d49d2f 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -26,7 +26,8 @@ import ( "google.golang.org/grpc/resolver" ) -const pickfirstName = "pick_first" +// PickFirstBalancerName is the name of the pick_first balancer. +const PickFirstBalancerName = "pick_first" func newPickfirstBuilder() balancer.Builder { return &pickfirstBuilder{} @@ -39,7 +40,7 @@ func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions } func (*pickfirstBuilder) Name() string { - return pickfirstName + return PickFirstBalancerName } type pickfirstBalancer struct { diff --git a/pickfirst_test.go b/pickfirst_test.go index a9e617e26e8..2f85febff1b 100644 --- a/pickfirst_test.go +++ b/pickfirst_test.go @@ -48,7 +48,7 @@ func TestOneBackendPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -82,7 +82,7 @@ func TestBackendsPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -116,7 +116,7 @@ func TestNewAddressWhileBlockingPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -153,7 +153,7 @@ func TestCloseWithPendingRPCPickfirst(t *testing.T) { _, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -190,7 +190,7 @@ func TestOneServerDownPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}), WithWaitForHandshake()) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake()) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -232,7 +232,7 @@ func TestAllServersDownPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{}), WithWaitForHandshake()) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake()) if err != nil { t.Fatalf("failed to dial: %v", err) } @@ -276,7 +276,7 @@ func TestAddressesRemovedPickfirst(t *testing.T) { servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() - cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithBalancerBuilder(newPickfirstBuilder()), WithCodec(testCodec{})) + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) if err != nil { t.Fatalf("failed to dial: %v", err) } diff --git a/test/end2end_test.go b/test/end2end_test.go index bebbfba6884..21c0cee3682 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -46,8 +46,7 @@ import ( "golang.org/x/net/http2" spb "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/grpc" - "google.golang.org/grpc/balancer" - _ "google.golang.org/grpc/balancer/roundrobin" + "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" @@ -694,11 +693,7 @@ func (te *test) clientConn() *grpc.ClientConn { case "v1": opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil))) case "round_robin": - rr := balancer.Get("round_robin") - if rr == nil { - te.t.Fatalf("got nil when trying to get roundrobin balancer builder") - } - opts = append(opts, grpc.WithBalancerBuilder(rr)) + opts = append(opts, grpc.WithBalancerName(roundrobin.Name)) } if te.clientInitialWindowSize > 0 { opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))