Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js-xds: Implement retry support #2280

Merged
merged 1 commit into from Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/grpc-js-xds/src/environment.ts
Expand Up @@ -16,4 +16,5 @@
*/

export const EXPERIMENTAL_FAULT_INJECTION = (process.env.GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION ?? 'true') === 'true';
export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true';
export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true';
export const EXPERIMENTAL_RETRY = process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY === 'true';
54 changes: 50 additions & 4 deletions packages/grpc-js-xds/src/resolver-xds.ts
Expand Up @@ -44,9 +44,10 @@ import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from './resourc
import Duration = experimental.Duration;
import { Duration__Output } from './generated/google/protobuf/Duration';
import { createHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from './http-filter';
import { EXPERIMENTAL_FAULT_INJECTION } from './environment';
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_RETRY } from './environment';
import Filter = experimental.Filter;
import FilterFactory = experimental.FilterFactory;
import RetryPolicy = experimental.RetryPolicy;

const TRACER_NAME = 'xds_resolver';

Expand Down Expand Up @@ -199,6 +200,24 @@ function protoDurationToDuration(duration: Duration__Output): Duration {
}
}

function protoDurationToSecondsString(duration: Duration__Output): string {
return `${duration.seconds + duration.nanos / 1_000_000_000}s`;
}

const DEFAULT_RETRY_BASE_INTERVAL = '0.025s'

function getDefaultRetryMaxInterval(baseInterval: string): string {
return `${Number.parseFloat(baseInterval.substring(0, baseInterval.length - 1)) * 10}s`;
}

const RETRY_CODES: {[key: string]: status} = {
'cancelled': status.CANCELLED,
'deadline-exceeded': status.DEADLINE_EXCEEDED,
'internal': status.INTERNAL,
'resource-exhausted': status.RESOURCE_EXHAUSTED,
'unavailable': status.UNAVAILABLE
};

class XdsResolver implements Resolver {
private hasReportedSuccess = false;

Expand Down Expand Up @@ -363,6 +382,33 @@ class XdsResolver implements Resolver {
}
}
}
let retryPolicy: RetryPolicy | undefined = undefined;
if (EXPERIMENTAL_RETRY) {
const retryConfig = route.route!.retry_policy ?? virtualHost.retry_policy;
if (retryConfig) {
const retryableStatusCodes = [];
for (const code of retryConfig.retry_on.split(',')) {
if (RETRY_CODES[code]) {
retryableStatusCodes.push(RETRY_CODES[code]);
}
}
if (retryableStatusCodes.length > 0) {
const baseInterval = retryConfig.retry_back_off?.base_interval ?
protoDurationToSecondsString(retryConfig.retry_back_off.base_interval) :
DEFAULT_RETRY_BASE_INTERVAL;
const maxInterval = retryConfig.retry_back_off?.max_interval ?
protoDurationToSecondsString(retryConfig.retry_back_off.max_interval) :
getDefaultRetryMaxInterval(baseInterval);
retryPolicy = {
backoffMultiplier: 2,
initialBackoff: baseInterval,
maxBackoff: maxInterval,
maxAttempts: (retryConfig.num_retries?.value ?? 1) + 1,
retryableStatusCodes: retryableStatusCodes
};
}
}
}
switch (route.route!.cluster_specifier) {
case 'cluster_header':
continue;
Expand Down Expand Up @@ -390,7 +436,7 @@ class XdsResolver implements Resolver {
}
}
}
routeAction = new SingleClusterRouteAction(cluster, timeout, extraFilterFactories);
routeAction = new SingleClusterRouteAction(cluster, {name: [], timeout: timeout, retryPolicy: retryPolicy}, extraFilterFactories);
break;
}
case 'weighted_clusters': {
Expand Down Expand Up @@ -432,7 +478,7 @@ class XdsResolver implements Resolver {
}
weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0, dynamicFilterFactories: extraFilterFactories});
}
routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, timeout);
routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, {name: [], timeout: timeout, retryPolicy: retryPolicy});
break;
}
default:
Expand Down Expand Up @@ -470,7 +516,7 @@ class XdsResolver implements Resolver {
this.unrefCluster(clusterResult.name);
}
return {
methodConfig: {name: [], timeout: action.getTimeout()},
methodConfig: clusterResult.methodConfig,
onCommitted: onCommitted,
pickInformation: {cluster: clusterResult.name},
status: status.OK,
Expand Down
31 changes: 9 additions & 22 deletions packages/grpc-js-xds/src/route-action.ts
Expand Up @@ -18,16 +18,17 @@ import { experimental } from '@grpc/grpc-js';
import Duration = experimental.Duration;
import Filter = experimental.Filter;
import FilterFactory = experimental.FilterFactory;
import MethodConfig = experimental.MethodConfig;

export interface ClusterResult {
name: string;
methodConfig: MethodConfig;
dynamicFilterFactories: FilterFactory<Filter>[];
}

export interface RouteAction {
toString(): string;
getCluster(): ClusterResult;
getTimeout(): Duration | undefined;
}

function durationToLogString(duration: Duration) {
Expand All @@ -40,25 +41,18 @@ function durationToLogString(duration: Duration) {
}

export class SingleClusterRouteAction implements RouteAction {
constructor(private cluster: string, private timeout: Duration | undefined, private extraFilterFactories: FilterFactory<Filter>[]) {}
constructor(private cluster: string, private methodConfig: MethodConfig, private extraFilterFactories: FilterFactory<Filter>[]) {}

getCluster() {
return {
name: this.cluster,
methodConfig: this.methodConfig,
dynamicFilterFactories: this.extraFilterFactories
};
}

toString() {
if (this.timeout) {
return 'SingleCluster(' + this.cluster + ', ' + 'timeout=' + durationToLogString(this.timeout) + 's)';
} else {
return 'SingleCluster(' + this.cluster + ')';
}
}

getTimeout() {
return this.timeout;
return 'SingleCluster(' + this.cluster + ', ' + JSON.stringify(this.methodConfig) + ')';
}
}

Expand All @@ -79,7 +73,7 @@ export class WeightedClusterRouteAction implements RouteAction {
* The weighted cluster choices represented as a CDF
*/
private clusterChoices: ClusterChoice[];
constructor(private clusters: WeightedCluster[], private totalWeight: number, private timeout: Duration | undefined) {
constructor(private clusters: WeightedCluster[], private totalWeight: number, private methodConfig: MethodConfig) {
this.clusterChoices = [];
let lastNumerator = 0;
for (const clusterWeight of clusters) {
Expand All @@ -94,24 +88,17 @@ export class WeightedClusterRouteAction implements RouteAction {
if (randomNumber < choice.numerator) {
return {
name: choice.name,
methodConfig: this.methodConfig,
dynamicFilterFactories: choice.dynamicFilterFactories
};
}
}
// This should be prevented by the validation rules
return {name: '', dynamicFilterFactories: []};
return {name: '', methodConfig: this.methodConfig, dynamicFilterFactories: []};
}

toString() {
const clusterListString = this.clusters.map(({name, weight}) => '(' + name + ':' + weight + ')').join(', ')
if (this.timeout) {
return 'WeightedCluster(' + clusterListString + ', ' + 'timeout=' + durationToLogString(this.timeout) + 's)';
} else {
return 'WeightedCluster(' + clusterListString + ')';
}
}

getTimeout() {
return this.timeout;
return 'WeightedCluster(' + clusterListString + ', ' + JSON.stringify(this.methodConfig) + ')';
}
}
43 changes: 42 additions & 1 deletion packages/grpc-js-xds/src/xds-stream-state/rds-state.ts
Expand Up @@ -15,8 +15,10 @@
*
*/

import { EXPERIMENTAL_FAULT_INJECTION } from "../environment";
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_RETRY } from "../environment";
import { RetryPolicy__Output } from "../generated/envoy/config/route/v3/RetryPolicy";
import { RouteConfiguration__Output } from "../generated/envoy/config/route/v3/RouteConfiguration";
import { Duration__Output } from "../generated/google/protobuf/Duration";
import { validateOverrideFilter } from "../http-filter";
import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state";

Expand All @@ -30,6 +32,13 @@ const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [
'suffix_match'];
const SUPPORTED_CLUSTER_SPECIFIERS = ['cluster', 'weighted_clusters', 'cluster_header'];

function durationToMs(duration: Duration__Output | null): number | null {
if (duration === null) {
return null;
}
return (Number.parseInt(duration.seconds) * 1000 + duration.nanos / 1_000_000) | 0;
}

export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> implements XdsStreamState<RouteConfiguration__Output> {
protected isStateOfTheWorld(): boolean {
return false;
Expand All @@ -40,6 +49,28 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
protected getProtocolName(): string {
return 'RDS';
}

private validateRetryPolicy(policy: RetryPolicy__Output | null): boolean {
if (policy === null) {
return true;
}
const numRetries = policy.num_retries?.value ?? 1
if (numRetries < 1) {
return false;
}
if (policy.retry_back_off) {
if (!policy.retry_back_off.base_interval) {
return false;
}
const baseInterval = durationToMs(policy.retry_back_off.base_interval)!;
const maxInterval = durationToMs(policy.retry_back_off.max_interval) ?? (10 * baseInterval);
if (!(maxInterval >= baseInterval) && (baseInterval > 0)) {
return false;
}
}
return true;
}

validateResponse(message: RouteConfiguration__Output): boolean {
// https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#response-validation
for (const virtualHost of message.virtual_hosts) {
Expand All @@ -62,6 +93,11 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
}
}
}
if (EXPERIMENTAL_RETRY) {
if (!this.validateRetryPolicy(virtualHost.retry_policy)) {
return false;
}
}
for (const route of virtualHost.routes) {
const match = route.match;
if (!match) {
Expand All @@ -88,6 +124,11 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
}
}
}
if (EXPERIMENTAL_RETRY) {
if (!this.validateRetryPolicy(route.route.retry_policy)) {
return false;
}
}
if (route.route!.cluster_specifier === 'weighted_clusters') {
if (route.route.weighted_clusters!.total_weight?.value === 0) {
return false;
Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js/src/experimental.ts
Expand Up @@ -7,7 +7,7 @@ export {
} from './resolver';
export { GrpcUri, uriToString } from './uri-parser';
export { Duration, durationToMs } from './duration';
export { ServiceConfig } from './service-config';
export { ServiceConfig, MethodConfig, RetryPolicy } from './service-config';
export { BackoffTimeout } from './backoff-timeout';
export {
LoadBalancer,
Expand Down