diff --git a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go new file mode 100644 index 00000000000..19a7aafb73b --- /dev/null +++ b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go @@ -0,0 +1,153 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package ringhash_test + +import ( + "context" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" + + _ "google.golang.org/grpc/xds/internal/balancer/ringhash" // Register the ring_hash_experimental LB policy. +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +const ( + defaultTestTimeout = 10 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. +) + +type testService struct { + testpb.TestServiceServer +} + +func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil +} + +// TestRingHash_ReconnectToMoveOutOfTransientFailure tests the case where the +// ring contains a single subConn, and verifies that when the server goes down, +// the LB policy on the client automatically reconnects until the subChannel +// moves out of TRANSIENT_FAILURE. +func (s) TestRingHash_ReconnectToMoveOutOfTransientFailure(t *testing.T) { + // Create a restartable listener to simulate server being down. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) + } + lis := testutils.NewRestartableListener(l) + + // Start a server backend exposing the test service. + server := grpc.NewServer() + defer server.Stop() + testgrpc.RegisterTestServiceServer(server, &testService{}) + go func() { + if err := server.Serve(lis); err != nil { + t.Errorf("Serve() failed: %v", err) + } + }() + + // Create a clientConn with a manual resolver (which is used to push the + // address of the test backend), and a default service config pointing to + // the use of the ring_hash_experimental LB policy. + const ringHashServiceConfig = `{"loadBalancingConfig": [{"ring_hash_experimental":{}}]}` + r := manual.NewBuilderWithScheme("whatever") + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithDefaultServiceConfig(ringHashServiceConfig), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + // Push the address of the test backend through the manual resolver. + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + + // Stopping the server listener will close the transport on the client, + // which will lead to the channel eventually moving to IDLE. The ring_hash + // LB policy is not expected to reconnect by itself at this point. + lis.Stop() + for state := cc.GetState(); state != connectivity.Idle && cc.WaitForStateChange(ctx, state); state = cc.GetState() { + } + if err := ctx.Err(); err != nil { + t.Fatalf("Timeout waiting for channel to reach %q after server shutdown: %v", connectivity.Idle, err) + } + + // Make an RPC to get the ring_hash LB policy to reconnect and thereby move + // to TRANSIENT_FAILURE upon connection failure. + client.EmptyCall(ctx, &testpb.Empty{}) + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + if cc.GetState() == connectivity.TransientFailure { + break + } + } + if err := ctx.Err(); err != nil { + t.Fatalf("Timeout waiting for channel to reach %q after server shutdown: %v", connectivity.TransientFailure, err) + } + + // An RPC at this point is expected to fail. + if _, err = client.EmptyCall(ctx, &testpb.Empty{}); err == nil { + t.Fatal("EmptyCall RPC succeeded when the channel is in TRANSIENT_FAILURE") + } + + // Restart the server listener. The ring_hash LB polcy is expected to + // attempt to reconnect on its own and come out of TRANSIENT_FAILURE, even + // without an RPC attempt. + lis.Restart() + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + if cc.GetState() == connectivity.Ready { + break + } + } + if err := ctx.Err(); err != nil { + t.Fatalf("Timeout waiting for channel to reach READT after server restart: %v", err) + } + + // An RPC at this point is expected to fail. + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } +} diff --git a/xds/internal/balancer/ringhash/logging.go b/xds/internal/balancer/ringhash/logging.go index 64a1d467f55..3e0f0adf58e 100644 --- a/xds/internal/balancer/ringhash/logging.go +++ b/xds/internal/balancer/ringhash/logging.go @@ -32,3 +32,7 @@ var logger = grpclog.Component("xds") func prefixLogger(p *ringhashBalancer) *internalgrpclog.PrefixLogger { return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)) } + +func subConnPrefixLogger(p *ringhashBalancer, sc *subConn) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)+fmt.Sprintf("[subConn %p] ", sc)) +} diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index e2ad49fca4a..8056b29c127 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -68,6 +68,7 @@ type subConn struct { addr string weight uint32 sc balancer.SubConn + logger *grpclog.PrefixLogger mu sync.RWMutex // This is the actual state of this SubConn (as updated by the ClientConn). @@ -117,6 +118,7 @@ func (sc *subConn) setState(s connectivity.State) { // Trigger Connect() if new state is Idle, and there is a queued connect. if sc.connectQueued { sc.connectQueued = false + sc.logger.Infof("Executing a queued connect for subConn moving to state: %v", sc.state) sc.sc.Connect() } else { sc.attemptingToConnect = false @@ -161,11 +163,13 @@ func (sc *subConn) queueConnect() { defer sc.mu.Unlock() sc.attemptingToConnect = true if sc.state == connectivity.Idle { + sc.logger.Infof("Executing a queued connect for subConn in state: %v", sc.state) sc.sc.Connect() return } // Queue this connect, and when this SubConn switches back to Idle (happens // after backoff in TransientFailure), it will Connect(). + sc.logger.Infof("Queueing a connect for subConn in state: %v", sc.state) sc.connectQueued = true } @@ -216,10 +220,11 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { if val, ok := b.subConns.Get(addr); !ok { sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true}) if err != nil { - logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err) + b.logger.Warningf("Failed to create new SubConn: %v", err) continue } scs := &subConn{addr: addr.Addr, weight: newWeight, sc: sc} + scs.logger = subConnPrefixLogger(b, scs) scs.setState(connectivity.Idle) b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle) b.subConns.Set(addr, scs) @@ -328,15 +333,18 @@ func (b *ringhashBalancer) ResolverError(err error) { // for some RPCs. func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { s := state.ConnectivityState - b.logger.Infof("handle SubConn state change: %p, %v", sc, s) + if logger.V(2) { + b.logger.Infof("Handle SubConn state change: %p, %v", sc, s) + } scs, ok := b.scStates[sc] if !ok { - b.logger.Infof("got state changes for an unknown SubConn: %p, %v", sc, s) + b.logger.Infof("Received state change for an unknown SubConn: %p, %v", sc, s) return } oldSCState := scs.effectiveState() scs.setState(s) newSCState := scs.effectiveState() + b.logger.Infof("SubConn's effective old state was: %v, new state is %v", oldSCState, newSCState) var sendUpdate bool oldBalancerState := b.state @@ -353,15 +361,15 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance // No need to send an update. No queued RPC can be unblocked. If the // overall state changed because of this, sendUpdate is already true. case connectivity.Ready: - // Resend the picker, there's no need to regenerate the picker because - // the ring didn't change. + // We need to regenerate the picker even if the ring has not changed + // because we could be moving from TRANSIENT_FAILURE to READY, in which + // case, we need to update the error picker returned earlier. + b.regeneratePicker() sendUpdate = true case connectivity.TransientFailure: // Save error to be reported via picker. b.connErr = state.ConnectionError - // Regenerate picker to update error message. b.regeneratePicker() - sendUpdate = true case connectivity.Shutdown: // When an address was removed by resolver, b called RemoveSubConn but // kept the sc's state in scStates. Remove state for this sc here. @@ -369,6 +377,7 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance } if sendUpdate { + b.logger.Infof("Pushing new state %v and picker %p", b.state, b.picker) b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) } @@ -399,7 +408,14 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance sc := nextSkippingDuplicatesSubConn(b.ring, scs) if sc != nil { sc.queueConnect() + return } + // This handles the edge case where we have a single subConn in the + // ring. nextSkippingDuplicatesSubCon() would have returned nil. We + // still need to ensure that some subConn is attempting to connect, in + // order to give the LB policy a chance to move out of + // TRANSIENT_FAILURE. Hence, we try connecting on the current subConn. + scs.queueConnect() } }