Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: move to TransientFailure in pick_first LB policy when all addresses are removed #5274

Merged
merged 5 commits into from Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions picker_wrapper_test.go
Expand Up @@ -97,7 +97,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
bp := newPickerWrapper()
var finishedCount uint64
bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount})
// All goroutines should block because picker returns no sc available.
// All goroutines should block because picker returns no subConn available.
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT {
Expand Down Expand Up @@ -138,7 +138,7 @@ func (s) TestBlockingPickSCNotReady(t *testing.T) {
bp := newPickerWrapper()
bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount})
var finishedCount uint64
// All goroutines should block because sc is not ready.
// All goroutines should block because subConn is not ready.
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT {
Expand Down
82 changes: 54 additions & 28 deletions pickfirst.go
Expand Up @@ -44,32 +44,46 @@ func (*pickfirstBuilder) Name() string {
}

type pickfirstBalancer struct {
state connectivity.State
cc balancer.ClientConn
sc balancer.SubConn
state connectivity.State
cc balancer.ClientConn
subConn balancer.SubConn
}

func (b *pickfirstBalancer) ResolverError(err error) {
switch b.state {
case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
// Set a failing picker if we don't have a good picker.
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
}
if logger.V(2) {
logger.Infof("pickfirstBalancer: ResolverError called with error %v", err)
}
if b.subConn == nil {
b.state = connectivity.TransientFailure
}

if b.state != connectivity.TransientFailure {
// The picker will not change since the balancer does not currently
// report an error.
return
}
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
}

func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error {
if len(cs.ResolverState.Addresses) == 0 {
if b.subConn != nil {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
// If the resolver returned empty addresses after it gave us non-empty
// addresses, we need to close the existing subConn. Setting subConn to nil
// here will cause us to return an error picker from ResolverError().
b.cc.RemoveSubConn(b.subConn)
b.subConn = nil
}
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
if b.sc == nil {

if b.subConn == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe invert, early return, and outdent?

if b.subConn != nil {
	b.cc.UpdateAddresses()
	b.subConn.Connect() // do we really need this?
	return
}
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

And yes, we don't have to call Connect here. UpdateAddresses does that for us if the current active connection is not in the new address list.

var err error
b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{})
b.subConn, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{})
if err != nil {
if logger.V(2) {
logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
Expand All @@ -81,38 +95,50 @@ func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) e
return balancer.ErrBadResolverState
}
b.state = connectivity.Idle
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}})
b.sc.Connect()
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Idle,
Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}},
})
b.subConn.Connect()
} else {
b.cc.UpdateAddresses(b.sc, cs.ResolverState.Addresses)
b.sc.Connect()
b.cc.UpdateAddresses(b.subConn, cs.ResolverState.Addresses)
b.subConn.Connect()
}
return nil
}

func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, s balancer.SubConnState) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was OK with the parameter name being short, since the scope is more limited. But if you're changing that, let's also change s to scs or subConnState?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Changed to state both here and in UpdateClientConnState.

if logger.V(2) {
logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", sc, s)
logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", subConn, s)
}
if b.sc != sc {
if b.subConn != subConn {
if logger.V(2) {
logger.Infof("pickfirstBalancer: ignored state change because sc is not recognized")
logger.Infof("pickfirstBalancer: ignored state change because subConn is not recognized")
}
return
}
b.state = s.ConnectivityState
if s.ConnectivityState == connectivity.Shutdown {
b.sc = nil
b.subConn = nil
return
}

switch s.ConnectivityState {
case connectivity.Ready:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: sc}}})
b.cc.UpdateState(balancer.State{
ConnectivityState: s.ConnectivityState,
Picker: &picker{result: balancer.PickResult{SubConn: subConn}},
})
case connectivity.Connecting:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}})
b.cc.UpdateState(balancer.State{
ConnectivityState: s.ConnectivityState,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
case connectivity.Idle:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &idlePicker{sc: sc}})
b.cc.UpdateState(balancer.State{
ConnectivityState: s.ConnectivityState,
Picker: &idlePicker{subConn: subConn},
})
case connectivity.TransientFailure:
b.cc.UpdateState(balancer.State{
ConnectivityState: s.ConnectivityState,
Expand All @@ -125,8 +151,8 @@ func (b *pickfirstBalancer) Close() {
}

func (b *pickfirstBalancer) ExitIdle() {
if b.sc != nil && b.state == connectivity.Idle {
b.sc.Connect()
if b.subConn != nil && b.state == connectivity.Idle {
b.subConn.Connect()
}
}

Expand All @@ -142,11 +168,11 @@ func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
// CONNECTING when Pick is called.
type idlePicker struct {
sc balancer.SubConn
subConn balancer.SubConn
}

func (i *idlePicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
i.sc.Connect()
i.subConn.Connect()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

Expand Down
82 changes: 0 additions & 82 deletions test/balancer_test.go
Expand Up @@ -32,7 +32,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -683,87 +682,6 @@ func (s) TestServersSwap(t *testing.T) {
}
}

// TestEmptyAddrs verifies client behavior when a working connection is
// removed. In pick first and round-robin, both will continue using the old
// connections.
Comment on lines -686 to -688
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure there's still a test for this set of circumstances, but that the RPCs begin failing instead of keep using old connections.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I thought we confirmed RR was removing the subconns, so RPCs should have been failing? But this test is validating the RPCs would still succeed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I thought we confirmed RR was removing the subconns, so RPCs should have been failing? But this test is validating the RPCs would still succeed?

Yes, this test was quite broken because it was using the pick_first clientConn to verify round_robin behaviour:

	// Confirm several new RPCs succeed on round robin.
	for i := 0; i < 10; i++ {
		if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
			t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
		}
		time.Sleep(5 * time.Millisecond)
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure there's still a test for this set of circumstances, but that the RPCs begin failing instead of keep using old connections.

Yes, the newly added test verifies that the channel moves to TransientFailure when addresses are removed and that a WaitForReady RPC blocks until new addresses are added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this test was quite broken because it was using the pick_first clientConn to verify round_robin behaviour:

Oof

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the newly added test verifies that the channel moves to TransientFailure when addresses are removed and that a WaitForReady RPC blocks until new addresses are added.

But only with pick first -- is there one for RR too elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have a couple of tests for round_robin that I recently cleaned up:
1
2

func (s) TestEmptyAddrs(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Initialize server
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
s := grpc.NewServer()
defer s.Stop()
const one = "1"
ts := &funcServer{
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Username: one}, nil
},
}
testpb.RegisterTestServiceServer(s, ts)
go s.Serve(lis)

// Initialize pickfirst client
pfr := manual.NewBuilderWithScheme("whatever")

pfr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})

pfcc, err := grpc.DialContext(ctx, pfr.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(pfr))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer pfcc.Close()
pfclient := testpb.NewTestServiceClient(pfcc)

// Confirm we are connected to the server
if res, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one {
t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one)
}

// Remove all addresses.
pfr.UpdateState(resolver.State{})

// Initialize roundrobin client
rrr := manual.NewBuilderWithScheme("whatever")

rrr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})

rrcc, err := grpc.DialContext(ctx, rrr.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(rrr),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name)))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer rrcc.Close()
rrclient := testpb.NewTestServiceClient(rrcc)

// Confirm we are connected to the server
if res, err := rrclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one {
t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one)
}

// Remove all addresses.
rrr.UpdateState(resolver.State{})

// Confirm several new RPCs succeed on pick first.
for i := 0; i < 10; i++ {
if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
}
time.Sleep(5 * time.Millisecond)
}

// Confirm several new RPCs succeed on round robin.
for i := 0; i < 10; i++ {
if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
}
time.Sleep(5 * time.Millisecond)
}
}

func (s) TestWaitForReady(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
74 changes: 74 additions & 0 deletions test/pickfirst_test.go
Expand Up @@ -26,7 +26,9 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
Expand All @@ -43,6 +45,11 @@ const pickFirstServiceConfig = `{"loadBalancingConfig": [{"pick_first":{}}]}`
// with service config specifying the use of the pick_first LB policy.
func setupPickFirst(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) {
t.Helper()

// Initialize channelz. Used to determine pending RPC count.
czCleanup := channelz.NewChannelzStorageForTesting()
t.Cleanup(func() { czCleanupWrapper(czCleanup, t) })

r := manual.NewBuilderWithScheme("whatever")

backends := make([]*stubserver.StubServer, backendCount)
Expand Down Expand Up @@ -258,3 +265,70 @@ func (s) TestPickFirst_AddressesRemoved(t *testing.T) {
t.Fatal(err)
}
}

// TestPickFirst_NewAddressWhileBlocking tests the case where pick_first is
// configured on a channel, things are working as expected and then a resolver
// updates removes all addresses. An RPC attempted at this point in time will be
// blocked because there are no valid backends. This test verifies that when new
// backends are added, the RPC is able to complete.
func (s) TestPickFirst_NewAddressWhileBlocking(t *testing.T) {
cc, r, backends := setupPickFirst(t, 2)
addrs := backendsToAddrs(backends)
r.UpdateState(resolver.State{Addresses: addrs})

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
t.Fatal(err)
}

// Send a resolver update with no addresses. This should push the channel into
// TransientFailure.
r.UpdateState(resolver.State{})
for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() {
if !cc.WaitForStateChange(ctx, state) {
t.Fatalf("timeout waiting for state change. got %v; want %v", state, connectivity.TransientFailure)
}
}

doneCh := make(chan struct{})
client := testpb.NewTestServiceClient(cc)
go func() {
// The channel is currently in TransientFailure and this RPC will block
// until the channel becomes Ready, which will only happen when we push a
// resolver update with a valid backend address.
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Errorf("EmptyCall() = %v, want <nil>", err)
}
close(doneCh)
}()

// Make sure that there is one pending RPC on the ClientConn before attempting
// to push new addresses through the name resolver. If we don't do this, the
// resolver update can happen before the above goroutine gets to make the RPC.
for {
if err := ctx.Err(); err != nil {
t.Fatal(err)
}
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
t.Fatalf("there should only be one top channel, not %d", len(tcs))
}
started := tcs[0].ChannelData.CallsStarted
completed := tcs[0].ChannelData.CallsSucceeded + tcs[0].ChannelData.CallsFailed
if (started - completed) == 1 {
break
}
time.Sleep(defaultTestShortTimeout)
}

// Send a resolver update with a valid backend to push the channel to Ready
// and unblock the above RPC.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}})

select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for blocked RPC to complete")
case <-doneCh:
}
}