Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: add retry support #4738

Merged
merged 4 commits into from Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/testing.yml
Expand Up @@ -49,7 +49,7 @@ jobs:

- type: tests
goversion: 1.17
grpcenv: GRPC_GO_RETRY=on
grpcenv: GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY=true

- type: extras
goversion: 1.17
Expand Down
6 changes: 4 additions & 2 deletions internal/envconfig/envconfig.go
Expand Up @@ -22,6 +22,8 @@ package envconfig
import (
"os"
"strings"

xdsenv "google.golang.org/grpc/internal/xds/env"
)

const (
Expand All @@ -31,8 +33,8 @@ const (
)

var (
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on".
Retry = strings.EqualFold(os.Getenv(retryStr), "on")
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on" or if XDS retry support is enabled.
Retry = strings.EqualFold(os.Getenv(retryStr), "on") || xdsenv.RetrySupport
// TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false")
)
4 changes: 4 additions & 0 deletions internal/xds/env/env.go
Expand Up @@ -41,6 +41,7 @@ const (

ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
retrySupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY"

c2pResolverSupportEnv = "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER"
c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
Expand Down Expand Up @@ -70,6 +71,9 @@ var (
// "true".
AggregateAndDNSSupportEnv = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")

// RetrySupport indicates whether xDS retry is enabled.
RetrySupport = strings.EqualFold(os.Getenv(retrySupportEnv), "true")

// C2PResolverSupport indicates whether support for C2P resolver is enabled.
// This can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
Expand Down
3 changes: 2 additions & 1 deletion test/retry_test.go
Expand Up @@ -113,7 +113,8 @@ func (s) TestRetryUnary(t *testing.T) {
}

func (s) TestRetryDisabledByDefault(t *testing.T) {
if strings.EqualFold(os.Getenv("GRPC_GO_RETRY"), "on") {
if strings.EqualFold(os.Getenv("GRPC_GO_RETRY"), "on") ||
strings.EqualFold(os.Getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY"), "true") {
return
}
i := -1
Expand Down
27 changes: 25 additions & 2 deletions xds/internal/resolver/serviceconfig.go
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpcrand"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -107,6 +108,8 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) {
type virtualHost struct {
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
// retry policy present in virtual host
retryConfig *xdsclient.RetryConfig
}

// routeCluster holds information about a cluster as referenced by a route.
Expand All @@ -122,6 +125,7 @@ type route struct {
maxStreamDuration time.Duration
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
retryConfig *xdsclient.RetryConfig
hashPolicies []*xdsclient.HashPolicy
}

Expand Down Expand Up @@ -195,10 +199,25 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
if rt.maxStreamDuration != 0 {
config.MethodConfig.Timeout = &rt.maxStreamDuration
}
if rt.retryConfig != nil {
config.MethodConfig.RetryPolicy = retryConfigToPolicy(rt.retryConfig)
} else if cs.virtualHost.retryConfig != nil {
config.MethodConfig.RetryPolicy = retryConfigToPolicy(cs.virtualHost.retryConfig)
}

return config, nil
}

func retryConfigToPolicy(config *xdsclient.RetryConfig) *serviceconfig.RetryPolicy {
return &serviceconfig.RetryPolicy{
MaxAttempts: int(config.NumRetries) + 1,
InitialBackoff: config.RetryBackoff.BaseInterval,
MaxBackoff: config.RetryBackoff.MaxInterval,
BackoffMultiplier: 2,
RetryableStatusCodes: config.RetryOn,
}
}

func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies []*xdsclient.HashPolicy) uint64 {
var hash uint64
var generatedHash bool
Expand Down Expand Up @@ -322,8 +341,11 @@ var newWRR = wrr.NewRandom
// r.activeClusters for previously-unseen clusters.
func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) {
cs := &configSelector{
r: r,
virtualHost: virtualHost{httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride},
r: r,
virtualHost: virtualHost{
httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride,
retryConfig: su.virtualHost.RetryConfig,
},
routes: make([]route, len(su.virtualHost.Routes)),
clusters: make(map[string]*clusterInfo),
httpFilterConfig: su.ldsConfig.httpFilterConfig,
Expand Down Expand Up @@ -361,6 +383,7 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
}

cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride
cs.routes[i].retryConfig = rt.RetryConfig
cs.routes[i].hashPolicies = rt.HashPolicies
}

Expand Down
161 changes: 158 additions & 3 deletions xds/internal/test/xds_client_integration_test.go
Expand Up @@ -28,10 +28,16 @@ import (
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/e2e"

v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
testpb "google.golang.org/grpc/test/grpc_testing"
)

Expand All @@ -42,10 +48,10 @@ import (
// Returns the following:
// - the port the server is listening on
// - cleanup function to be invoked by the tests when done
func clientSetup(t *testing.T) (uint32, func()) {
func clientSetup(t *testing.T, tss testpb.TestServiceServer) (uint32, func()) {
// Initialize a gRPC server and register the stubServer on it.
server := grpc.NewServer()
testpb.RegisterTestServiceServer(server, &testService{})
testpb.RegisterTestServiceServer(server, tss)

// Create a local listener and pass it to Serve().
lis, err := testutils.LocalTCPListener()
Expand All @@ -68,7 +74,7 @@ func (s) TestClientSideXDS(t *testing.T) {
managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t)
defer cleanup1()

port, cleanup2 := clientSetup(t)
port, cleanup2 := clientSetup(t, &testService{})
defer cleanup2()

const serviceName = "my-service-client-side-xds"
Expand Down Expand Up @@ -97,3 +103,152 @@ func (s) TestClientSideXDS(t *testing.T) {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
}

func (s) TestClientSideRetry(t *testing.T) {
if !env.RetrySupport {
// Skip this test if retry is not enabled.
return
}

ctr := 0
errs := []codes.Code{codes.ResourceExhausted}
ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
defer func() { ctr++ }()
if ctr < len(errs) {
return nil, status.Errorf(errs[ctr], "this should be retried")
}
return &testpb.Empty{}, nil
},
}

managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t)
defer cleanup1()

port, cleanup2 := clientSetup(t, ss)
defer cleanup2()

const serviceName = "my-service-client-side-xds"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
Port: port,
SecLevel: e2e.SecurityLevelNone,
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create a ClientConn and make a successful RPC.
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

client := testpb.NewTestServiceClient(cc)
defer cancel()
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.ResourceExhausted {
t.Fatalf("rpc EmptyCall() = _, %v; want _, ResourceExhausted", err)
}

testCases := []struct {
name string
vhPolicy *v3routepb.RetryPolicy
routePolicy *v3routepb.RetryPolicy
errs []codes.Code // the errors returned by the server for each RPC
tryAgainErr codes.Code // the error that would be returned if we are still using the old retry policies.
errWant codes.Code
}{{
name: "virtualHost only, fail",
vhPolicy: &v3routepb.RetryPolicy{
RetryOn: "resource-exhausted,unavailable",
NumRetries: &wrapperspb.UInt32Value{Value: 1},
},
errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable},
routePolicy: nil,
tryAgainErr: codes.ResourceExhausted,
errWant: codes.Unavailable,
}, {
name: "virtualHost only",
vhPolicy: &v3routepb.RetryPolicy{
RetryOn: "resource-exhausted, unavailable",
NumRetries: &wrapperspb.UInt32Value{Value: 2},
},
errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable},
routePolicy: nil,
tryAgainErr: codes.Unavailable,
errWant: codes.OK,
}, {
name: "virtualHost+route, fail",
vhPolicy: &v3routepb.RetryPolicy{
RetryOn: "resource-exhausted,unavailable",
NumRetries: &wrapperspb.UInt32Value{Value: 2},
},
routePolicy: &v3routepb.RetryPolicy{
RetryOn: "resource-exhausted",
NumRetries: &wrapperspb.UInt32Value{Value: 2},
},
errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable},
tryAgainErr: codes.OK,
errWant: codes.Unavailable,
}, {
name: "virtualHost+route",
vhPolicy: &v3routepb.RetryPolicy{
RetryOn: "resource-exhausted",
NumRetries: &wrapperspb.UInt32Value{Value: 2},
},
routePolicy: &v3routepb.RetryPolicy{
RetryOn: "unavailable",
NumRetries: &wrapperspb.UInt32Value{Value: 2},
},
errs: []codes.Code{codes.Unavailable},
tryAgainErr: codes.Unavailable,
errWant: codes.OK,
}, {
name: "virtualHost+route, not enough attempts",
vhPolicy: &v3routepb.RetryPolicy{
RetryOn: "unavailable",
NumRetries: &wrapperspb.UInt32Value{Value: 2},
},
routePolicy: &v3routepb.RetryPolicy{
RetryOn: "unavailable",
NumRetries: &wrapperspb.UInt32Value{Value: 1},
},
errs: []codes.Code{codes.Unavailable, codes.Unavailable},
tryAgainErr: codes.OK,
errWant: codes.Unavailable,
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
errs = tc.errs

// Confirm tryAgainErr is correct before updating resources.
ctr = 0
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if code := status.Code(err); code != tc.tryAgainErr {
t.Fatalf("with old retry policy: EmptyCall() = _, %v; want _, %v", err, tc.tryAgainErr)
}

resources.Routes[0].VirtualHosts[0].RetryPolicy = tc.vhPolicy
resources.Routes[0].VirtualHosts[0].Routes[0].GetRoute().RetryPolicy = tc.routePolicy
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

for {
ctr = 0
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if code := status.Code(err); code == tc.tryAgainErr {
Copy link
Contributor

@zasweq zasweq Sep 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this. Why do we need this? If the management server is updated with the new resources which specify a retry policy, the error returned after all the retries should be tc.errWant right, never the old retry policy? Otherwise this test makes sense. Or does the old retry come before the new retry?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is updating the management server with the new retry policy & sending RPCs from the client are asynchronous operations. We have to wait for the xds stream to send the resource, for the xdsclient to process it and send it to the resolver, and for the resolver to update the config selector being used by the ClientConn. So even though we can synchronously update our server behavior, we can't synchronously update the xds configuration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, so no waits. I'm guessing that this isn't async in other xds client integration tests because you Dial after updating management server, so by the time you send an RPC it's guaranteed to have configuration applied? Thanks for the explanation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. If we update the management server before the channel is started then we know it will never be fed old data. I could just recreate the channel each time instead and avoid this trouble, but this way makes sure that updates to the policy are working, too.

continue
} else if code != tc.errWant {
t.Fatalf("rpc EmptyCall() = _, %v; want _, %v", err, tc.errWant)
}
break
}
})
}
}
20 changes: 20 additions & 0 deletions xds/internal/xdsclient/client.go
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/xds/matcher"
"google.golang.org/grpc/xds/internal/httpfilter"
"google.golang.org/grpc/xds/internal/xdsclient/load"
Expand Down Expand Up @@ -269,6 +270,24 @@ type VirtualHost struct {
// may be unused if the matching Route contains an override for that
// filter.
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
RetryConfig *RetryConfig
}

// RetryConfig contains all retry-related configuration in either a VirtualHost
// or Route.
type RetryConfig struct {
// RetryOn is a set of status codes on which to retry. Only Canceled,
// DeadlineExceeded, Internal, ResourceExhausted, and Unavailable are
// supported; any other values will be omitted.
RetryOn map[codes.Code]bool
NumRetries uint32 // maximum number of retry attempts
RetryBackoff RetryBackoff // retry backoff policy
}

// RetryBackoff describes the backoff policy for retries.
type RetryBackoff struct {
BaseInterval time.Duration // initial backoff duration between attempts
MaxInterval time.Duration // maximum backoff duration
}

// HashPolicyType specifies the type of HashPolicy from a received RDS Response.
Expand Down Expand Up @@ -339,6 +358,7 @@ type Route struct {
// unused if the matching WeightedCluster contains an override for that
// filter.
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
RetryConfig *RetryConfig

RouteAction RouteAction
}
Expand Down