Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/ringhash: make reconnection logic work for a single subConn #5601

Merged
merged 4 commits into from Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
153 changes: 153 additions & 0 deletions xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go
@@ -0,0 +1,153 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ringhash_test

import (
"context"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"

testgrpc "google.golang.org/grpc/test/grpc_testing"
testpb "google.golang.org/grpc/test/grpc_testing"

_ "google.golang.org/grpc/xds/internal/balancer/ringhash" // Register the ring_hash_experimental LB policy.
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

const (
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)

type testService struct {
testpb.TestServiceServer
}

func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
}

// TestRingHash_ReconnectToMoveOutOfTransientFailure tests the case where the
// ring contains a single subConn, and verifies that when the server goes down,
// the LB policy on the client automatically reconnects until the subChannel
// moves out of TRANSIENT_FAILURE.
func (s) TestRingHash_ReconnectToMoveOutOfTransientFailure(t *testing.T) {
// Create a restartable listener to simulate server being down.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)

// Start a server backend exposing the test service.
server := grpc.NewServer()
defer server.Stop()
testgrpc.RegisterTestServiceServer(server, &testService{})
go func() {
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()

// Create a clientConn with a manual resolver (which is used to push the
// address of the test backend), and a default service config pointing to
// the use of the ring_hash_experimental LB policy.
const ringHashServiceConfig = `{"loadBalancingConfig": [{"ring_hash_experimental":{}}]}`
r := manual.NewBuilderWithScheme("whatever")
dopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithResolvers(r),
grpc.WithDefaultServiceConfig(ringHashServiceConfig),
}
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

// Push the address of the test backend through the manual resolver.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}

// Stopping the server listener will close the transport on the client,
// which will lead to the channel eventually moving to IDLE. The ring_hash
// LB policy is not expected to reconnect by itself at this point.
lis.Stop()
for state := cc.GetState(); state != connectivity.Idle && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
}
if err := ctx.Err(); err != nil {
t.Fatalf("Timeout waiting for channel to reach %q after server shutdown: %v", connectivity.Idle, err)
}

// Make an RPC to get the ring_hash LB policy to reconnect and thereby move
// to TRANSIENT_FAILURE upon connection failure.
client.EmptyCall(ctx, &testpb.Empty{})
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
if cc.GetState() == connectivity.TransientFailure {
break
}
}
if err := ctx.Err(); err != nil {
t.Fatalf("Timeout waiting for channel to reach %q after server shutdown: %v", connectivity.TransientFailure, err)
}

// An RPC at this point is expected to fail.
if _, err = client.EmptyCall(ctx, &testpb.Empty{}); err == nil {
t.Fatal("EmptyCall RPC succeeded when the channel is in TRANSIENT_FAILURE")
}

// Restart the server listener. The ring_hash LB polcy is expected to
// attempt to reconnect on its own and come out of TRANSIENT_FAILURE, even
// without an RPC attempt.
lis.Restart()
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
if cc.GetState() == connectivity.Ready {
break
}
}
if err := ctx.Err(); err != nil {
t.Fatalf("Timeout waiting for channel to reach READT after server restart: %v", err)
}

// An RPC at this point is expected to fail.
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
}
4 changes: 4 additions & 0 deletions xds/internal/balancer/ringhash/logging.go
Expand Up @@ -32,3 +32,7 @@ var logger = grpclog.Component("xds")
func prefixLogger(p *ringhashBalancer) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
}

func subConnPrefixLogger(p *ringhashBalancer, sc *subConn) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)+fmt.Sprintf("[subConn %p] ", sc))
}
30 changes: 23 additions & 7 deletions xds/internal/balancer/ringhash/ringhash.go
Expand Up @@ -68,6 +68,7 @@ type subConn struct {
addr string
weight uint32
sc balancer.SubConn
logger *grpclog.PrefixLogger

mu sync.RWMutex
// This is the actual state of this SubConn (as updated by the ClientConn).
Expand Down Expand Up @@ -117,6 +118,7 @@ func (sc *subConn) setState(s connectivity.State) {
// Trigger Connect() if new state is Idle, and there is a queued connect.
if sc.connectQueued {
sc.connectQueued = false
sc.logger.Infof("Executing a queued connect for subConn moving to state: %v", sc.state)
sc.sc.Connect()
} else {
sc.attemptingToConnect = false
Expand Down Expand Up @@ -161,11 +163,13 @@ func (sc *subConn) queueConnect() {
defer sc.mu.Unlock()
sc.attemptingToConnect = true
if sc.state == connectivity.Idle {
sc.logger.Infof("Executing a queued connect for subConn in state: %v", sc.state)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging the exact same text here could make it hard to distinguish which operation is occurring. Did we just queue the connect attempt, or was it queued when the subconn went IDLE? Maybe this distinction is unimportant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connection attempt is queued (by calling queueConnect() method) from UpdateSubConnState() when we want to start connecting on the next available subConn.

But this queueConnect() method actually starts connecting only if the subConn is in IDLE. For all other states, it set the connectQueued bit to true, and when that subConn eventually transitions to IDLE, the connect attempt is made. See setState() which is called from UpdateSubConnState().

I think the distinction is not important here, whether the connection attempt happened right when it was queued or when it eventually moved to IDLE. What do you think?

sc.sc.Connect()
return
}
// Queue this connect, and when this SubConn switches back to Idle (happens
// after backoff in TransientFailure), it will Connect().
sc.logger.Infof("Queueing a connect for subConn in state: %v", sc.state)
sc.connectQueued = true
}

Expand Down Expand Up @@ -216,10 +220,11 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
if val, ok := b.subConns.Get(addr); !ok {
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
b.logger.Warningf("Failed to create new SubConn: %v", err)
dfawley marked this conversation as resolved.
Show resolved Hide resolved
continue
}
scs := &subConn{addr: addr.Addr, weight: newWeight, sc: sc}
scs.logger = subConnPrefixLogger(b, scs)
scs.setState(connectivity.Idle)
b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle)
b.subConns.Set(addr, scs)
Expand Down Expand Up @@ -328,15 +333,18 @@ func (b *ringhashBalancer) ResolverError(err error) {
// for some RPCs.
func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
b.logger.Infof("handle SubConn state change: %p, %v", sc, s)
if logger.V(2) {
b.logger.Infof("Handle SubConn state change: %p, %v", sc, s)
}
scs, ok := b.scStates[sc]
if !ok {
b.logger.Infof("got state changes for an unknown SubConn: %p, %v", sc, s)
b.logger.Infof("Received state change for an unknown SubConn: %p, %v", sc, s)
return
}
oldSCState := scs.effectiveState()
scs.setState(s)
newSCState := scs.effectiveState()
b.logger.Infof("SubConn's effective old state was: %v, new state is %v", oldSCState, newSCState)

var sendUpdate bool
oldBalancerState := b.state
Expand All @@ -353,22 +361,23 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance
// No need to send an update. No queued RPC can be unblocked. If the
// overall state changed because of this, sendUpdate is already true.
case connectivity.Ready:
// Resend the picker, there's no need to regenerate the picker because
// the ring didn't change.
// We need to regenerate the picker even if the ring has not changed
// because we could be moving from TRANSIENT_FAILURE to READY, in which
// case, we need to update the error picker returned earlier.
b.regeneratePicker()
sendUpdate = true
case connectivity.TransientFailure:
// Save error to be reported via picker.
b.connErr = state.ConnectionError
// Regenerate picker to update error message.
b.regeneratePicker()
sendUpdate = true
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
}

if sendUpdate {
b.logger.Infof("Pushing new state %v and picker %p", b.state, b.picker)
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}

Expand Down Expand Up @@ -399,7 +408,14 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance
sc := nextSkippingDuplicatesSubConn(b.ring, scs)
if sc != nil {
sc.queueConnect()
return
}
// This handles the edge case where we have a single subConn in the
// ring. nextSkippingDuplicatesSubCon() would have returned nil. We
// still need to ensure that some subConn is attempting to connect, in
// order to give the LB policy a chance to move out of
// TRANSIENT_FAILURE. Hence, we try connecting on the current subConn.
scs.queueConnect()
}
}

Expand Down