Skip to content

Commit

Permalink
grpc: move to TransientFailure from pickfirst LB policy when all addr…
Browse files Browse the repository at this point in the history
…esses are removed
  • Loading branch information
easwars committed Mar 25, 2022
1 parent 3a74cd5 commit cdf3089
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 89 deletions.
28 changes: 21 additions & 7 deletions pickfirst.go
Expand Up @@ -50,23 +50,37 @@ type pickfirstBalancer struct {
}

func (b *pickfirstBalancer) ResolverError(err error) {
switch b.state {
case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
// Set a failing picker if we don't have a good picker.
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
}
if logger.V(2) {
logger.Infof("pickfirstBalancer: ResolverError called with error %v", err)
}
if b.sc == nil {
b.state = connectivity.TransientFailure
}

if b.state != connectivity.TransientFailure {
// The picker will not change since the balancer does not currently
// report an error.
return
}
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
}

func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error {
if len(cs.ResolverState.Addresses) == 0 {
if b.sc != nil {
// If the resolver returned empty addresses after it gave us non-empty
// addresses, we need to close the existing subConn. Setting sc to nil
// here will cause us to return an error picker from ResolverError().
b.cc.RemoveSubConn(b.sc)
b.sc = nil
}
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}

if b.sc == nil {
var err error
b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{})
Expand Down
82 changes: 0 additions & 82 deletions test/balancer_test.go
Expand Up @@ -32,7 +32,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -683,87 +682,6 @@ func (s) TestServersSwap(t *testing.T) {
}
}

// TestEmptyAddrs verifies client behavior when a working connection is
// removed. In pick first and round-robin, both will continue using the old
// connections.
func (s) TestEmptyAddrs(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Initialize server
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
s := grpc.NewServer()
defer s.Stop()
const one = "1"
ts := &funcServer{
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Username: one}, nil
},
}
testpb.RegisterTestServiceServer(s, ts)
go s.Serve(lis)

// Initialize pickfirst client
pfr := manual.NewBuilderWithScheme("whatever")

pfr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})

pfcc, err := grpc.DialContext(ctx, pfr.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(pfr))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer pfcc.Close()
pfclient := testpb.NewTestServiceClient(pfcc)

// Confirm we are connected to the server
if res, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one {
t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one)
}

// Remove all addresses.
pfr.UpdateState(resolver.State{})

// Initialize roundrobin client
rrr := manual.NewBuilderWithScheme("whatever")

rrr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})

rrcc, err := grpc.DialContext(ctx, rrr.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(rrr),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name)))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer rrcc.Close()
rrclient := testpb.NewTestServiceClient(rrcc)

// Confirm we are connected to the server
if res, err := rrclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one {
t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one)
}

// Remove all addresses.
rrr.UpdateState(resolver.State{})

// Confirm several new RPCs succeed on pick first.
for i := 0; i < 10; i++ {
if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
}
time.Sleep(5 * time.Millisecond)
}

// Confirm several new RPCs succeed on round robin.
for i := 0; i < 10; i++ {
if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
}
time.Sleep(5 * time.Millisecond)
}
}

func (s) TestWaitForReady(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
74 changes: 74 additions & 0 deletions test/pickfirst_test.go
Expand Up @@ -26,7 +26,9 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
Expand All @@ -43,6 +45,11 @@ const pickFirstServiceConfig = `{"loadBalancingConfig": [{"pick_first":{}}]}`
// with service config specifying the use of the pick_first LB policy.
func setupPickFirst(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) {
t.Helper()

// Initialize channelz. Used to determine pending RPC count.
czCleanup := channelz.NewChannelzStorageForTesting()
t.Cleanup(func() { czCleanupWrapper(czCleanup, t) })

r := manual.NewBuilderWithScheme("whatever")

backends := make([]*stubserver.StubServer, backendCount)
Expand Down Expand Up @@ -258,3 +265,70 @@ func (s) TestPickFirst_AddressesRemoved(t *testing.T) {
t.Fatal(err)
}
}

// TestPickFirst_NewAddressWhileBlocking tests the case where pick_first is
// configured on a channel, things are working as expected and then a resolver
// updates removes all addresses. An RPC attempted at this point in time will be
// blocked because there are no valid backends. This test verifies that when new
// backends are added, the RPC is able to complete.
func (s) TestPickFirst_NewAddressWhileBlocking(t *testing.T) {
cc, r, backends := setupPickFirst(t, 2)
addrs := backendsToAddrs(backends)
r.UpdateState(resolver.State{Addresses: addrs})

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
t.Fatal(err)
}

// Send a resolver update with no addresses. This should push the channel into
// TransientFailure.
r.UpdateState(resolver.State{})
for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() {
if !cc.WaitForStateChange(ctx, state) {
t.Fatalf("timeout waiting for state change. got %v; want %v", state, connectivity.TransientFailure)
}
}

doneCh := make(chan struct{})
client := testpb.NewTestServiceClient(cc)
go func() {
// The channel is currently in TransientFailure and this RPC will block
// until the channel becomes Ready, which will only happen when we push a
// resolver update with a valid backend address.
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Errorf("EmptyCall() = %v, want <nil>", err)
}
close(doneCh)
}()

// Make sure that there is one pending RPC on the ClientConn before attempting
// to push new addresses through the name resolver. If we don't do this, the
// resolver update can happen before the above goroutine gets to make the RPC.
for {
if err := ctx.Err(); err != nil {
t.Fatal(err)
}
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
t.Fatalf("there should only be one top channel, not %d", len(tcs))
}
started := tcs[0].ChannelData.CallsStarted
completed := tcs[0].ChannelData.CallsSucceeded + tcs[0].ChannelData.CallsFailed
if (started - completed) == 1 {
break
}
time.Sleep(defaultTestShortTimeout)
}

// Send a resolver update with a valid backend to push the channel to Ready
// and unblock the above RPC.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}})

select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for blocked RPC to complete")
case <-doneCh:
}
}

0 comments on commit cdf3089

Please sign in to comment.