Skip to content

Commit

Permalink
xds/ringhash: make reconnection logic work for a single subConn (#5601)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Aug 26, 2022
1 parent b225dda commit d5dee5f
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 7 deletions.
153 changes: 153 additions & 0 deletions xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go
@@ -0,0 +1,153 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ringhash_test

import (
"context"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"

testgrpc "google.golang.org/grpc/test/grpc_testing"
testpb "google.golang.org/grpc/test/grpc_testing"

_ "google.golang.org/grpc/xds/internal/balancer/ringhash" // Register the ring_hash_experimental LB policy.
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

const (
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)

type testService struct {
testpb.TestServiceServer
}

func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
}

// TestRingHash_ReconnectToMoveOutOfTransientFailure tests the case where the
// ring contains a single subConn, and verifies that when the server goes down,
// the LB policy on the client automatically reconnects until the subChannel
// moves out of TRANSIENT_FAILURE.
func (s) TestRingHash_ReconnectToMoveOutOfTransientFailure(t *testing.T) {
// Create a restartable listener to simulate server being down.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)

// Start a server backend exposing the test service.
server := grpc.NewServer()
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()

// Create a clientConn with a manual resolver (which is used to push the
// address of the test backend), and a default service config pointing to
// the use of the ring_hash_experimental LB policy.
const ringHashServiceConfig = `{"loadBalancingConfig": [{"ring_hash_experimental":{}}]}`
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithDefaultServiceConfig(ringHashServiceConfig),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

// Push the address of the test backend through the manual resolver.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}

// Stopping the server listener will close the transport on the client,
// which will lead to the channel eventually moving to IDLE. The ring_hash
// LB policy is not expected to reconnect by itself at this point.
lis.Stop()
for state := cc.GetState(); state != connectivity.Idle && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
}
if err := ctx.Err(); err != nil {
t.Fatalf("Timeout waiting for channel to reach %q after server shutdown: %v", connectivity.Idle, err)
}

// Make an RPC to get the ring_hash LB policy to reconnect and thereby move
// to TRANSIENT_FAILURE upon connection failure.
client.EmptyCall(ctx, &testpb.Empty{})
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
if cc.GetState() == connectivity.TransientFailure {
break
}
}
if err := ctx.Err(); err != nil {
t.Fatalf("Timeout waiting for channel to reach %q after server shutdown: %v", connectivity.TransientFailure, err)
}

// An RPC at this point is expected to fail.
if _, err = client.EmptyCall(ctx, &testpb.Empty{}); err == nil {
t.Fatal("EmptyCall RPC succeeded when the channel is in TRANSIENT_FAILURE")
}

// Restart the server listener. The ring_hash LB polcy is expected to
// attempt to reconnect on its own and come out of TRANSIENT_FAILURE, even
// without an RPC attempt.
lis.Restart()
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
if cc.GetState() == connectivity.Ready {
break
}
}
if err := ctx.Err(); err != nil {
t.Fatalf("Timeout waiting for channel to reach READT after server restart: %v", err)
}

// An RPC at this point is expected to fail.
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
}
4 changes: 4 additions & 0 deletions xds/internal/balancer/ringhash/logging.go
Expand Up @@ -32,3 +32,7 @@ var logger = grpclog.Component("xds")
func prefixLogger(p *ringhashBalancer) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
}

func subConnPrefixLogger(p *ringhashBalancer, sc *subConn) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)+fmt.Sprintf("[subConn %p] ", sc))
}
30 changes: 23 additions & 7 deletions xds/internal/balancer/ringhash/ringhash.go
Expand Up @@ -68,6 +68,7 @@ type subConn struct {
addr string
weight uint32
sc balancer.SubConn
logger *grpclog.PrefixLogger

mu sync.RWMutex
// This is the actual state of this SubConn (as updated by the ClientConn).
Expand Down Expand Up @@ -117,6 +118,7 @@ func (sc *subConn) setState(s connectivity.State) {
// Trigger Connect() if new state is Idle, and there is a queued connect.
if sc.connectQueued {
sc.connectQueued = false
sc.logger.Infof("Executing a queued connect for subConn moving to state: %v", sc.state)
sc.sc.Connect()
} else {
sc.attemptingToConnect = false
Expand Down Expand Up @@ -161,11 +163,13 @@ func (sc *subConn) queueConnect() {
defer sc.mu.Unlock()
sc.attemptingToConnect = true
if sc.state == connectivity.Idle {
sc.logger.Infof("Executing a queued connect for subConn in state: %v", sc.state)
sc.sc.Connect()
return
}
// Queue this connect, and when this SubConn switches back to Idle (happens
// after backoff in TransientFailure), it will Connect().
sc.logger.Infof("Queueing a connect for subConn in state: %v", sc.state)
sc.connectQueued = true
}

Expand Down Expand Up @@ -216,10 +220,11 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
if val, ok := b.subConns.Get(addr); !ok {
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
b.logger.Warningf("Failed to create new SubConn: %v", err)
continue
}
scs := &subConn{addr: addr.Addr, weight: newWeight, sc: sc}
scs.logger = subConnPrefixLogger(b, scs)
scs.setState(connectivity.Idle)
b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle)
b.subConns.Set(addr, scs)
Expand Down Expand Up @@ -328,15 +333,18 @@ func (b *ringhashBalancer) ResolverError(err error) {
// for some RPCs.
func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
b.logger.Infof("handle SubConn state change: %p, %v", sc, s)
if logger.V(2) {
b.logger.Infof("Handle SubConn state change: %p, %v", sc, s)
}
scs, ok := b.scStates[sc]
if !ok {
b.logger.Infof("got state changes for an unknown SubConn: %p, %v", sc, s)
b.logger.Infof("Received state change for an unknown SubConn: %p, %v", sc, s)
return
}
oldSCState := scs.effectiveState()
scs.setState(s)
newSCState := scs.effectiveState()
b.logger.Infof("SubConn's effective old state was: %v, new state is %v", oldSCState, newSCState)

var sendUpdate bool
oldBalancerState := b.state
Expand All @@ -353,22 +361,23 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance
// No need to send an update. No queued RPC can be unblocked. If the
// overall state changed because of this, sendUpdate is already true.
case connectivity.Ready:
// Resend the picker, there's no need to regenerate the picker because
// the ring didn't change.
// We need to regenerate the picker even if the ring has not changed
// because we could be moving from TRANSIENT_FAILURE to READY, in which
// case, we need to update the error picker returned earlier.
b.regeneratePicker()
sendUpdate = true
case connectivity.TransientFailure:
// Save error to be reported via picker.
b.connErr = state.ConnectionError
// Regenerate picker to update error message.
b.regeneratePicker()
sendUpdate = true
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
}

if sendUpdate {
b.logger.Infof("Pushing new state %v and picker %p", b.state, b.picker)
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}

Expand Down Expand Up @@ -399,7 +408,14 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance
sc := nextSkippingDuplicatesSubConn(b.ring, scs)
if sc != nil {
sc.queueConnect()
return
}
// This handles the edge case where we have a single subConn in the
// ring. nextSkippingDuplicatesSubCon() would have returned nil. We
// still need to ensure that some subConn is attempting to connect, in
// order to give the LB policy a chance to move out of
// TRANSIENT_FAILURE. Hence, we try connecting on the current subConn.
scs.queueConnect()
}
}

Expand Down

0 comments on commit d5dee5f

Please sign in to comment.