Skip to content

Commit

Permalink
xdsclient: add retry config support
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Sep 7, 2021
1 parent c99a9c1 commit f162c91
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/testing.yml
Expand Up @@ -49,7 +49,7 @@ jobs:

- type: tests
goversion: 1.17
grpcenv: GRPC_GO_RETRY=on
grpcenv: GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY=true

- type: extras
goversion: 1.17
Expand Down
6 changes: 4 additions & 2 deletions internal/envconfig/envconfig.go
Expand Up @@ -22,6 +22,8 @@ package envconfig
import (
"os"
"strings"

xdsenv "google.golang.org/grpc/internal/xds/env"
)

const (
Expand All @@ -31,8 +33,8 @@ const (
)

var (
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on".
Retry = strings.EqualFold(os.Getenv(retryStr), "on")
// Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on" or if XDS retry support is enabled.
Retry = strings.EqualFold(os.Getenv(retryStr), "on") || xdsenv.RetrySupport
// TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false")
)
4 changes: 4 additions & 0 deletions internal/xds/env/env.go
Expand Up @@ -42,6 +42,7 @@ const (
ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"
clientSideSecuritySupportEnv = "GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT"
aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
retrySupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY"

c2pResolverSupportEnv = "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER"
c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI"
Expand Down Expand Up @@ -78,6 +79,9 @@ var (
// "true".
AggregateAndDNSSupportEnv = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true")

// RetrySupport indicates whether xDS retry is enabled.
RetrySupport = strings.EqualFold(os.Getenv(retrySupportEnv), "true")

// C2PResolverSupport indicates whether support for C2P resolver is enabled.
// This can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true".
Expand Down
3 changes: 2 additions & 1 deletion test/retry_test.go
Expand Up @@ -113,7 +113,8 @@ func (s) TestRetryUnary(t *testing.T) {
}

func (s) TestRetryDisabledByDefault(t *testing.T) {
if strings.EqualFold(os.Getenv("GRPC_GO_RETRY"), "on") {
if strings.EqualFold(os.Getenv("GRPC_GO_RETRY"), "on") ||
strings.EqualFold(os.Getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY"), "true") {
return
}
i := -1
Expand Down
27 changes: 25 additions & 2 deletions xds/internal/resolver/serviceconfig.go
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpcrand"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -107,6 +108,8 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) {
type virtualHost struct {
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
// retry policy present in virtual host
retryConfig *xdsclient.RetryConfig
}

// routeCluster holds information about a cluster as referenced by a route.
Expand All @@ -122,6 +125,7 @@ type route struct {
maxStreamDuration time.Duration
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
retryConfig *xdsclient.RetryConfig
hashPolicies []*xdsclient.HashPolicy
}

Expand Down Expand Up @@ -195,10 +199,25 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
if rt.maxStreamDuration != 0 {
config.MethodConfig.Timeout = &rt.maxStreamDuration
}
if rt.retryConfig != nil {
config.MethodConfig.RetryPolicy = retryConfigToPolicy(rt.retryConfig)
} else if cs.virtualHost.retryConfig != nil {
config.MethodConfig.RetryPolicy = retryConfigToPolicy(cs.virtualHost.retryConfig)
}

return config, nil
}

func retryConfigToPolicy(config *xdsclient.RetryConfig) *serviceconfig.RetryPolicy {
return &serviceconfig.RetryPolicy{
MaxAttempts: int(config.NumRetries) + 1,
InitialBackoff: config.RetryBackoff.BaseInterval,
MaxBackoff: config.RetryBackoff.MaxInterval,
BackoffMultiplier: 2,
RetryableStatusCodes: config.RetryOn,
}
}

func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies []*xdsclient.HashPolicy) uint64 {
var hash uint64
var generatedHash bool
Expand Down Expand Up @@ -322,8 +341,11 @@ var newWRR = wrr.NewRandom
// r.activeClusters for previously-unseen clusters.
func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) {
cs := &configSelector{
r: r,
virtualHost: virtualHost{httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride},
r: r,
virtualHost: virtualHost{
httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride,
retryConfig: su.virtualHost.RetryConfig,
},
routes: make([]route, len(su.virtualHost.Routes)),
clusters: make(map[string]*clusterInfo),
httpFilterConfig: su.ldsConfig.httpFilterConfig,
Expand Down Expand Up @@ -361,6 +383,7 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
}

cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride
cs.routes[i].retryConfig = rt.RetryConfig
cs.routes[i].hashPolicies = rt.HashPolicies
}

Expand Down
158 changes: 155 additions & 3 deletions xds/internal/test/xds_client_integration_test.go
Expand Up @@ -28,10 +28,16 @@ import (
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/xds/env"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/e2e"

v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
testpb "google.golang.org/grpc/test/grpc_testing"
)

Expand All @@ -42,10 +48,10 @@ import (
// Returns the following:
// - the port the server is listening on
// - cleanup function to be invoked by the tests when done
func clientSetup(t *testing.T) (uint32, func()) {
func clientSetup(t *testing.T, tss testpb.TestServiceServer) (uint32, func()) {
// Initialize a gRPC server and register the stubServer on it.
server := grpc.NewServer()
testpb.RegisterTestServiceServer(server, &testService{})
testpb.RegisterTestServiceServer(server, tss)

// Create a local listener and pass it to Serve().
lis, err := testutils.LocalTCPListener()
Expand All @@ -68,7 +74,7 @@ func (s) TestClientSideXDS(t *testing.T) {
managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t)
defer cleanup1()

port, cleanup2 := clientSetup(t)
port, cleanup2 := clientSetup(t, &testService{})
defer cleanup2()

const serviceName = "my-service-client-side-xds"
Expand Down Expand Up @@ -97,3 +103,149 @@ func (s) TestClientSideXDS(t *testing.T) {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
}

func (s) TestClientSideRetry(t *testing.T) {
if !env.RetrySupport {
// Skip this test if retry is not enabled.
return
}

ctr := 0
errs := []codes.Code{codes.ResourceExhausted}
ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
defer func() { ctr++ }()
if ctr < len(errs) {
return nil, status.Errorf(errs[ctr], "this should be retried")
}
return &testpb.Empty{}, nil
},
}

port, cleanup := clientSetup(t, ss)
defer cleanup()

const serviceName = "my-service-client-side-xds"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: xdsClientNodeID,
Host: "localhost",
Port: port,
SecLevel: e2e.SecurityLevelNone,
})
if err := managementServer.Update(resources); err != nil {
t.Fatal(err)
}

// Create a ClientConn and make a successful RPC.
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolverBuilder))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

client := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.ResourceExhausted {
t.Fatalf("rpc EmptyCall() = _, %v; want _, ResourceExhausted", err)
}

testCases := []struct {
name string
vhPolicy *v3routepb.RetryPolicy
routePolicy *v3routepb.RetryPolicy
errs []codes.Code // the errors returned by the server for each RPC
tryAgainErr codes.Code // the error that would be returned if we are still using the old retry policies.
errWant codes.Code
}{{
name: "virtualHost only, fail",
vhPolicy: &v3routepb.RetryPolicy{
RetryOn: "resource-exhausted,unavailable",
NumRetries: &wrapperspb.UInt32Value{Value: 1},
},
errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable},
routePolicy: nil,
tryAgainErr: codes.ResourceExhausted,
errWant: codes.Unavailable,
}, {
name: "virtualHost only",
vhPolicy: &v3routepb.RetryPolicy{
RetryOn: "resource-exhausted, unavailable",
NumRetries: &wrapperspb.UInt32Value{Value: 2},
},
errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable},
routePolicy: nil,
tryAgainErr: codes.Unavailable,
errWant: codes.OK,
}, {
name: "virtualHost+route, fail",
vhPolicy: &v3routepb.RetryPolicy{
RetryOn: "resource-exhausted,unavailable",
NumRetries: &wrapperspb.UInt32Value{Value: 2},
},
routePolicy: &v3routepb.RetryPolicy{
RetryOn: "resource-exhausted",
NumRetries: &wrapperspb.UInt32Value{Value: 2},
},
errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable},
tryAgainErr: codes.OK,
errWant: codes.Unavailable,
}, {
name: "virtualHost+route",
vhPolicy: &v3routepb.RetryPolicy{
RetryOn: "resource-exhausted",
NumRetries: &wrapperspb.UInt32Value{Value: 2},
},
routePolicy: &v3routepb.RetryPolicy{
RetryOn: "unavailable",
NumRetries: &wrapperspb.UInt32Value{Value: 2},
},
errs: []codes.Code{codes.Unavailable},
tryAgainErr: codes.Unavailable,
errWant: codes.OK,
}, {
name: "virtualHost+route, not enough attempts",
vhPolicy: &v3routepb.RetryPolicy{
RetryOn: "unavailable",
NumRetries: &wrapperspb.UInt32Value{Value: 2},
},
routePolicy: &v3routepb.RetryPolicy{
RetryOn: "unavailable",
NumRetries: &wrapperspb.UInt32Value{Value: 1},
},
errs: []codes.Code{codes.Unavailable, codes.Unavailable},
tryAgainErr: codes.OK,
errWant: codes.Unavailable,
}}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
errs = tc.errs

// Confirm tryAgainErr is correct before updating resources.
ctr = 0
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if code := status.Code(err); code != tc.tryAgainErr {
t.Fatalf("with old retry policy: EmptyCall() = _, %v; want _, %v", err, tc.tryAgainErr)
}

resources.Routes[0].VirtualHosts[0].RetryPolicy = tc.vhPolicy
resources.Routes[0].VirtualHosts[0].Routes[0].GetRoute().RetryPolicy = tc.routePolicy
if err := managementServer.Update(resources); err != nil {
t.Fatal(err)
}

for {
ctr = 0
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if code := status.Code(err); code == tc.tryAgainErr {
continue
} else if code != tc.errWant {
t.Fatalf("rpc EmptyCall() = _, %v; want _, %v", err, tc.errWant)
}
break
}
})
}
}
20 changes: 20 additions & 0 deletions xds/internal/xdsclient/client.go
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/xds/matcher"
"google.golang.org/grpc/xds/internal/httpfilter"
"google.golang.org/grpc/xds/internal/xdsclient/load"
Expand Down Expand Up @@ -269,6 +270,24 @@ type VirtualHost struct {
// may be unused if the matching Route contains an override for that
// filter.
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
RetryConfig *RetryConfig
}

// RetryConfig contains all retry-related configuration in either a VirtualHost
// or Route.
type RetryConfig struct {
// RetryOn is a set of status codes on which to retry. Only Canceled,
// DeadlineExceeded, Internal, ResourceExhausted, and Unavailable are
// supported; any other values will be omitted.
RetryOn map[codes.Code]bool
NumRetries uint32 // maximum number of retry attempts
RetryBackoff RetryBackoff // retry backoff policy
}

// RetryBackoff describes the backoff policy for retries.
type RetryBackoff struct {
BaseInterval time.Duration // initial backoff duration between attempts
MaxInterval time.Duration // maximum backoff duration
}

// HashPolicyType specifies the type of HashPolicy from a received RDS Response.
Expand Down Expand Up @@ -339,6 +358,7 @@ type Route struct {
// unused if the matching WeightedCluster contains an override for that
// filter.
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
RetryConfig *RetryConfig

RouteAction RouteAction
}
Expand Down

0 comments on commit f162c91

Please sign in to comment.