diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 7bf81546360..9a45ba04085 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -49,7 +49,7 @@ jobs: - type: tests goversion: 1.17 - grpcenv: GRPC_GO_RETRY=on + grpcenv: GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY=true - type: extras goversion: 1.17 diff --git a/internal/envconfig/envconfig.go b/internal/envconfig/envconfig.go index 73931a94bca..e766ac04af2 100644 --- a/internal/envconfig/envconfig.go +++ b/internal/envconfig/envconfig.go @@ -22,6 +22,8 @@ package envconfig import ( "os" "strings" + + xdsenv "google.golang.org/grpc/internal/xds/env" ) const ( @@ -31,8 +33,8 @@ const ( ) var ( - // Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on". - Retry = strings.EqualFold(os.Getenv(retryStr), "on") + // Retry is set if retry is explicitly enabled via "GRPC_GO_RETRY=on" or if XDS retry support is enabled. + Retry = strings.EqualFold(os.Getenv(retryStr), "on") || xdsenv.RetrySupport // TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false"). TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false") ) diff --git a/internal/xds/env/env.go b/internal/xds/env/env.go index c2e4e5d9718..9d5d47ff2a8 100644 --- a/internal/xds/env/env.go +++ b/internal/xds/env/env.go @@ -41,6 +41,7 @@ const ( ringHashSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" aggregateAndDNSSupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" + retrySupportEnv = "GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY" c2pResolverSupportEnv = "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" c2pResolverTestOnlyTrafficDirectorURIEnv = "GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI" @@ -70,6 +71,9 @@ var ( // "true". AggregateAndDNSSupportEnv = strings.EqualFold(os.Getenv(aggregateAndDNSSupportEnv), "true") + // RetrySupport indicates whether xDS retry is enabled. + RetrySupport = strings.EqualFold(os.Getenv(retrySupportEnv), "true") + // C2PResolverSupport indicates whether support for C2P resolver is enabled. // This can be enabled by setting the environment variable // "GRPC_EXPERIMENTAL_GOOGLE_C2P_RESOLVER" to "true". diff --git a/test/retry_test.go b/test/retry_test.go index f93c9ac053f..ad1268faa96 100644 --- a/test/retry_test.go +++ b/test/retry_test.go @@ -113,7 +113,8 @@ func (s) TestRetryUnary(t *testing.T) { } func (s) TestRetryDisabledByDefault(t *testing.T) { - if strings.EqualFold(os.Getenv("GRPC_GO_RETRY"), "on") { + if strings.EqualFold(os.Getenv("GRPC_GO_RETRY"), "on") || + strings.EqualFold(os.Getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY"), "true") { return } i := -1 diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index 27f6aab7ad0..dceea49b3b6 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/grpcrand" iresolver "google.golang.org/grpc/internal/resolver" + "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/internal/wrr" "google.golang.org/grpc/internal/xds/env" "google.golang.org/grpc/metadata" @@ -107,6 +108,8 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) { type virtualHost struct { // map from filter name to its config httpFilterConfigOverride map[string]httpfilter.FilterConfig + // retry policy present in virtual host + retryConfig *xdsclient.RetryConfig } // routeCluster holds information about a cluster as referenced by a route. @@ -122,6 +125,7 @@ type route struct { maxStreamDuration time.Duration // map from filter name to its config httpFilterConfigOverride map[string]httpfilter.FilterConfig + retryConfig *xdsclient.RetryConfig hashPolicies []*xdsclient.HashPolicy } @@ -195,10 +199,25 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP if rt.maxStreamDuration != 0 { config.MethodConfig.Timeout = &rt.maxStreamDuration } + if rt.retryConfig != nil { + config.MethodConfig.RetryPolicy = retryConfigToPolicy(rt.retryConfig) + } else if cs.virtualHost.retryConfig != nil { + config.MethodConfig.RetryPolicy = retryConfigToPolicy(cs.virtualHost.retryConfig) + } return config, nil } +func retryConfigToPolicy(config *xdsclient.RetryConfig) *serviceconfig.RetryPolicy { + return &serviceconfig.RetryPolicy{ + MaxAttempts: int(config.NumRetries) + 1, + InitialBackoff: config.RetryBackoff.BaseInterval, + MaxBackoff: config.RetryBackoff.MaxInterval, + BackoffMultiplier: 2, + RetryableStatusCodes: config.RetryOn, + } +} + func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies []*xdsclient.HashPolicy) uint64 { var hash uint64 var generatedHash bool @@ -322,8 +341,11 @@ var newWRR = wrr.NewRandom // r.activeClusters for previously-unseen clusters. func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) { cs := &configSelector{ - r: r, - virtualHost: virtualHost{httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride}, + r: r, + virtualHost: virtualHost{ + httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride, + retryConfig: su.virtualHost.RetryConfig, + }, routes: make([]route, len(su.virtualHost.Routes)), clusters: make(map[string]*clusterInfo), httpFilterConfig: su.ldsConfig.httpFilterConfig, @@ -361,6 +383,7 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro } cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride + cs.routes[i].retryConfig = rt.RetryConfig cs.routes[i].hashPolicies = rt.HashPolicies } diff --git a/xds/internal/test/xds_client_integration_test.go b/xds/internal/test/xds_client_integration_test.go index f4b60e67694..23ea1546935 100644 --- a/xds/internal/test/xds_client_integration_test.go +++ b/xds/internal/test/xds_client_integration_test.go @@ -28,10 +28,16 @@ import ( "testing" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/xds/env" + "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/e2e" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + wrapperspb "github.com/golang/protobuf/ptypes/wrappers" testpb "google.golang.org/grpc/test/grpc_testing" ) @@ -42,10 +48,10 @@ import ( // Returns the following: // - the port the server is listening on // - cleanup function to be invoked by the tests when done -func clientSetup(t *testing.T) (uint32, func()) { +func clientSetup(t *testing.T, tss testpb.TestServiceServer) (uint32, func()) { // Initialize a gRPC server and register the stubServer on it. server := grpc.NewServer() - testpb.RegisterTestServiceServer(server, &testService{}) + testpb.RegisterTestServiceServer(server, tss) // Create a local listener and pass it to Serve(). lis, err := testutils.LocalTCPListener() @@ -68,7 +74,7 @@ func (s) TestClientSideXDS(t *testing.T) { managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t) defer cleanup1() - port, cleanup2 := clientSetup(t) + port, cleanup2 := clientSetup(t, &testService{}) defer cleanup2() const serviceName = "my-service-client-side-xds" @@ -97,3 +103,152 @@ func (s) TestClientSideXDS(t *testing.T) { t.Fatalf("rpc EmptyCall() failed: %v", err) } } + +func (s) TestClientSideRetry(t *testing.T) { + if !env.RetrySupport { + // Skip this test if retry is not enabled. + return + } + + ctr := 0 + errs := []codes.Code{codes.ResourceExhausted} + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + defer func() { ctr++ }() + if ctr < len(errs) { + return nil, status.Errorf(errs[ctr], "this should be retried") + } + return &testpb.Empty{}, nil + }, + } + + managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t) + defer cleanup1() + + port, cleanup2 := clientSetup(t, ss) + defer cleanup2() + + const serviceName = "my-service-client-side-xds" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: port, + SecLevel: e2e.SecurityLevelNone, + }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testpb.NewTestServiceClient(cc) + defer cancel() + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.ResourceExhausted { + t.Fatalf("rpc EmptyCall() = _, %v; want _, ResourceExhausted", err) + } + + testCases := []struct { + name string + vhPolicy *v3routepb.RetryPolicy + routePolicy *v3routepb.RetryPolicy + errs []codes.Code // the errors returned by the server for each RPC + tryAgainErr codes.Code // the error that would be returned if we are still using the old retry policies. + errWant codes.Code + }{{ + name: "virtualHost only, fail", + vhPolicy: &v3routepb.RetryPolicy{ + RetryOn: "resource-exhausted,unavailable", + NumRetries: &wrapperspb.UInt32Value{Value: 1}, + }, + errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable}, + routePolicy: nil, + tryAgainErr: codes.ResourceExhausted, + errWant: codes.Unavailable, + }, { + name: "virtualHost only", + vhPolicy: &v3routepb.RetryPolicy{ + RetryOn: "resource-exhausted, unavailable", + NumRetries: &wrapperspb.UInt32Value{Value: 2}, + }, + errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable}, + routePolicy: nil, + tryAgainErr: codes.Unavailable, + errWant: codes.OK, + }, { + name: "virtualHost+route, fail", + vhPolicy: &v3routepb.RetryPolicy{ + RetryOn: "resource-exhausted,unavailable", + NumRetries: &wrapperspb.UInt32Value{Value: 2}, + }, + routePolicy: &v3routepb.RetryPolicy{ + RetryOn: "resource-exhausted", + NumRetries: &wrapperspb.UInt32Value{Value: 2}, + }, + errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable}, + tryAgainErr: codes.OK, + errWant: codes.Unavailable, + }, { + name: "virtualHost+route", + vhPolicy: &v3routepb.RetryPolicy{ + RetryOn: "resource-exhausted", + NumRetries: &wrapperspb.UInt32Value{Value: 2}, + }, + routePolicy: &v3routepb.RetryPolicy{ + RetryOn: "unavailable", + NumRetries: &wrapperspb.UInt32Value{Value: 2}, + }, + errs: []codes.Code{codes.Unavailable}, + tryAgainErr: codes.Unavailable, + errWant: codes.OK, + }, { + name: "virtualHost+route, not enough attempts", + vhPolicy: &v3routepb.RetryPolicy{ + RetryOn: "unavailable", + NumRetries: &wrapperspb.UInt32Value{Value: 2}, + }, + routePolicy: &v3routepb.RetryPolicy{ + RetryOn: "unavailable", + NumRetries: &wrapperspb.UInt32Value{Value: 1}, + }, + errs: []codes.Code{codes.Unavailable, codes.Unavailable}, + tryAgainErr: codes.OK, + errWant: codes.Unavailable, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + errs = tc.errs + + // Confirm tryAgainErr is correct before updating resources. + ctr = 0 + _, err := client.EmptyCall(ctx, &testpb.Empty{}) + if code := status.Code(err); code != tc.tryAgainErr { + t.Fatalf("with old retry policy: EmptyCall() = _, %v; want _, %v", err, tc.tryAgainErr) + } + + resources.Routes[0].VirtualHosts[0].RetryPolicy = tc.vhPolicy + resources.Routes[0].VirtualHosts[0].Routes[0].GetRoute().RetryPolicy = tc.routePolicy + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + for { + ctr = 0 + _, err := client.EmptyCall(ctx, &testpb.Empty{}) + if code := status.Code(err); code == tc.tryAgainErr { + continue + } else if code != tc.errWant { + t.Fatalf("rpc EmptyCall() = _, %v; want _, %v", err, tc.errWant) + } + break + } + }) + } +} diff --git a/xds/internal/xdsclient/client.go b/xds/internal/xdsclient/client.go index 754a025678a..b6310bb7008 100644 --- a/xds/internal/xdsclient/client.go +++ b/xds/internal/xdsclient/client.go @@ -33,6 +33,7 @@ import ( "github.com/golang/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/xds/matcher" "google.golang.org/grpc/xds/internal/httpfilter" "google.golang.org/grpc/xds/internal/xdsclient/load" @@ -269,6 +270,24 @@ type VirtualHost struct { // may be unused if the matching Route contains an override for that // filter. HTTPFilterConfigOverride map[string]httpfilter.FilterConfig + RetryConfig *RetryConfig +} + +// RetryConfig contains all retry-related configuration in either a VirtualHost +// or Route. +type RetryConfig struct { + // RetryOn is a set of status codes on which to retry. Only Canceled, + // DeadlineExceeded, Internal, ResourceExhausted, and Unavailable are + // supported; any other values will be omitted. + RetryOn map[codes.Code]bool + NumRetries uint32 // maximum number of retry attempts + RetryBackoff RetryBackoff // retry backoff policy +} + +// RetryBackoff describes the backoff policy for retries. +type RetryBackoff struct { + BaseInterval time.Duration // initial backoff duration between attempts + MaxInterval time.Duration // maximum backoff duration } // HashPolicyType specifies the type of HashPolicy from a received RDS Response. @@ -339,6 +358,7 @@ type Route struct { // unused if the matching WeightedCluster contains an override for that // filter. HTTPFilterConfigOverride map[string]httpfilter.FilterConfig + RetryConfig *RetryConfig RouteAction RouteAction } diff --git a/xds/internal/xdsclient/rds_test.go b/xds/internal/xdsclient/rds_test.go index 3787ae0ff32..138e3a0bd2b 100644 --- a/xds/internal/xdsclient/rds_test.go +++ b/xds/internal/xdsclient/rds_test.go @@ -26,6 +26,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/xds/env" "google.golang.org/grpc/xds/internal/httpfilter" @@ -80,6 +81,49 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { }}, } } + goodRouteConfigWithRetryPolicy = func(vhrp *v3routepb.RetryPolicy, rrp *v3routepb.RetryPolicy) *v3routepb.RouteConfiguration { + return &v3routepb.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{ldsTarget}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName}, + RetryPolicy: rrp, + }, + }, + }}, + RetryPolicy: vhrp, + }}, + } + } + goodUpdateWithRetryPolicy = func(vhrc *RetryConfig, rrc *RetryConfig) RouteConfigUpdate { + if !env.RetrySupport { + vhrc = nil + rrc = nil + } + return RouteConfigUpdate{ + VirtualHosts: []*VirtualHost{{ + Domains: []string{ldsTarget}, + Routes: []*Route{{ + Prefix: newStringP("/"), + WeightedClusters: map[string]WeightedCluster{clusterName: {Weight: 1}}, + RouteAction: RouteActionRoute, + RetryConfig: rrc, + }}, + RetryConfig: vhrc, + }}, + } + } + defaultRetryBackoff = RetryBackoff{BaseInterval: 25 * time.Millisecond, MaxInterval: 250 * time.Millisecond} + goodUpdateIfRetryDisabled = func() RouteConfigUpdate { + if env.RetrySupport { + return RouteConfigUpdate{} + } + return goodUpdateWithRetryPolicy(nil, nil) + } ) tests := []struct { @@ -485,8 +529,49 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { rc: goodRouteConfigWithFilterConfigs(map[string]*anypb.Any{"foo": wrappedOptionalFilter("unknown.custom.filter")}), wantUpdate: goodUpdateWithFilterConfigs(nil), }, + { + name: "good-route-config-with-retry-policy", + rc: goodRouteConfigWithRetryPolicy( + &v3routepb.RetryPolicy{RetryOn: "cancelled"}, + &v3routepb.RetryPolicy{RetryOn: "deadline-exceeded,unsupported", NumRetries: &wrapperspb.UInt32Value{Value: 2}}), + wantUpdate: goodUpdateWithRetryPolicy( + &RetryConfig{RetryOn: map[codes.Code]bool{codes.Canceled: true}, NumRetries: 1, RetryBackoff: defaultRetryBackoff}, + &RetryConfig{RetryOn: map[codes.Code]bool{codes.DeadlineExceeded: true}, NumRetries: 2, RetryBackoff: defaultRetryBackoff}), + }, + { + name: "good-route-config-with-retry-backoff", + rc: goodRouteConfigWithRetryPolicy( + &v3routepb.RetryPolicy{RetryOn: "internal", RetryBackOff: &v3routepb.RetryPolicy_RetryBackOff{BaseInterval: durationpb.New(10 * time.Millisecond), MaxInterval: durationpb.New(10 * time.Millisecond)}}, + &v3routepb.RetryPolicy{RetryOn: "resource-exhausted", RetryBackOff: &v3routepb.RetryPolicy_RetryBackOff{BaseInterval: durationpb.New(10 * time.Millisecond)}}), + wantUpdate: goodUpdateWithRetryPolicy( + &RetryConfig{RetryOn: map[codes.Code]bool{codes.Internal: true}, NumRetries: 1, RetryBackoff: RetryBackoff{BaseInterval: 10 * time.Millisecond, MaxInterval: 10 * time.Millisecond}}, + &RetryConfig{RetryOn: map[codes.Code]bool{codes.ResourceExhausted: true}, NumRetries: 1, RetryBackoff: RetryBackoff{BaseInterval: 10 * time.Millisecond, MaxInterval: 100 * time.Millisecond}}), + }, + { + name: "bad-retry-policy-0-retries", + rc: goodRouteConfigWithRetryPolicy(&v3routepb.RetryPolicy{RetryOn: "cancelled", NumRetries: &wrapperspb.UInt32Value{Value: 0}}, nil), + wantUpdate: goodUpdateIfRetryDisabled(), + wantError: env.RetrySupport, + }, + { + name: "bad-retry-policy-0-base-interval", + rc: goodRouteConfigWithRetryPolicy(&v3routepb.RetryPolicy{RetryOn: "cancelled", RetryBackOff: &v3routepb.RetryPolicy_RetryBackOff{BaseInterval: durationpb.New(0)}}, nil), + wantUpdate: goodUpdateIfRetryDisabled(), + wantError: env.RetrySupport, + }, + { + name: "bad-retry-policy-negative-max-interval", + rc: goodRouteConfigWithRetryPolicy(&v3routepb.RetryPolicy{RetryOn: "cancelled", RetryBackOff: &v3routepb.RetryPolicy_RetryBackOff{MaxInterval: durationpb.New(-time.Second)}}, nil), + wantUpdate: goodUpdateIfRetryDisabled(), + wantError: env.RetrySupport, + }, + { + name: "bad-retry-policy-negative-max-interval-no-known-retry-on", + rc: goodRouteConfigWithRetryPolicy(&v3routepb.RetryPolicy{RetryOn: "something", RetryBackOff: &v3routepb.RetryPolicy_RetryBackOff{MaxInterval: durationpb.New(-time.Second)}}, nil), + wantUpdate: goodUpdateIfRetryDisabled(), + wantError: env.RetrySupport, + }, } - for _, test := range tests { t.Run(test.name, func(t *testing.T) { gotUpdate, gotError := generateRDSUpdateFromRouteConfiguration(test.rc, nil, false) diff --git a/xds/internal/xdsclient/xds.go b/xds/internal/xdsclient/xds.go index bfb2e3261a0..3d9148eebcd 100644 --- a/xds/internal/xdsclient/xds.go +++ b/xds/internal/xdsclient/xds.go @@ -39,6 +39,7 @@ import ( v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/codes" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/internal/xds/matcher" "google.golang.org/protobuf/types/known/anypb" @@ -344,9 +345,14 @@ func generateRDSUpdateFromRouteConfiguration(rc *v3routepb.RouteConfiguration, l if err != nil { return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err) } + rc, err := generateRetryConfig(vh.GetRetryPolicy()) + if err != nil { + return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err) + } vhOut := &VirtualHost{ - Domains: vh.GetDomains(), - Routes: routes, + Domains: vh.GetDomains(), + Routes: routes, + RetryConfig: rc, } if !v2 { cfgs, err := processHTTPFilterOverrides(vh.GetTypedPerFilterConfig()) @@ -360,6 +366,60 @@ func generateRDSUpdateFromRouteConfiguration(rc *v3routepb.RouteConfiguration, l return RouteConfigUpdate{VirtualHosts: vhs}, nil } +func generateRetryConfig(rp *v3routepb.RetryPolicy) (*RetryConfig, error) { + if !env.RetrySupport || rp == nil { + return nil, nil + } + + cfg := &RetryConfig{RetryOn: make(map[codes.Code]bool)} + for _, s := range strings.Split(rp.GetRetryOn(), ",") { + switch strings.TrimSpace(strings.ToLower(s)) { + case "cancelled": + cfg.RetryOn[codes.Canceled] = true + case "deadline-exceeded": + cfg.RetryOn[codes.DeadlineExceeded] = true + case "internal": + cfg.RetryOn[codes.Internal] = true + case "resource-exhausted": + cfg.RetryOn[codes.ResourceExhausted] = true + case "unavailable": + cfg.RetryOn[codes.Unavailable] = true + } + } + + if rp.NumRetries == nil { + cfg.NumRetries = 1 + } else { + cfg.NumRetries = rp.GetNumRetries().Value + if cfg.NumRetries < 1 { + return nil, fmt.Errorf("retry_policy.num_retries = %v; must be >= 1", cfg.NumRetries) + } + } + + backoff := rp.GetRetryBackOff() + if backoff == nil { + cfg.RetryBackoff.BaseInterval = 25 * time.Millisecond + } else { + cfg.RetryBackoff.BaseInterval = backoff.GetBaseInterval().AsDuration() + if cfg.RetryBackoff.BaseInterval <= 0 { + return nil, fmt.Errorf("retry_policy.base_interval = %v; must be > 0", cfg.RetryBackoff.BaseInterval) + } + } + if max := backoff.GetMaxInterval(); max == nil { + cfg.RetryBackoff.MaxInterval = 10 * cfg.RetryBackoff.BaseInterval + } else { + cfg.RetryBackoff.MaxInterval = max.AsDuration() + if cfg.RetryBackoff.MaxInterval <= 0 { + return nil, fmt.Errorf("retry_policy.max_interval = %v; must be > 0", cfg.RetryBackoff.MaxInterval) + } + } + + if len(cfg.RetryOn) == 0 { + return &RetryConfig{}, nil + } + return cfg, nil +} + func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, v2 bool) ([]*Route, error) { var routesRet []*Route for _, r := range routes { @@ -507,7 +567,15 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger, d := dur.AsDuration() route.MaxStreamDuration = &d } + + var err error + route.RetryConfig, err = generateRetryConfig(action.GetRetryPolicy()) + if err != nil { + return nil, fmt.Errorf("route %+v, action %+v: %v", r, action, err) + } + route.RouteAction = RouteActionRoute + case *v3routepb.Route_NonForwardingAction: // Expected to be used on server side. route.RouteAction = RouteActionNonForwardingAction