Skip to content

Commit

Permalink
Merge pull request #2280 from murgatroid99/grpc-js-xds_retry
Browse files Browse the repository at this point in the history
grpc-js-xds: Implement retry support
  • Loading branch information
murgatroid99 committed Nov 30, 2022
2 parents 271c848 + edf612a commit 9214988
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 29 deletions.
3 changes: 2 additions & 1 deletion packages/grpc-js-xds/src/environment.ts
Expand Up @@ -16,4 +16,5 @@
*/

export const EXPERIMENTAL_FAULT_INJECTION = (process.env.GRPC_XDS_EXPERIMENTAL_FAULT_INJECTION ?? 'true') === 'true';
export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true';
export const EXPERIMENTAL_OUTLIER_DETECTION = (process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION ?? 'true') === 'true';
export const EXPERIMENTAL_RETRY = process.env.GRPC_XDS_EXPERIMENTAL_ENABLE_RETRY === 'true';
54 changes: 50 additions & 4 deletions packages/grpc-js-xds/src/resolver-xds.ts
Expand Up @@ -44,9 +44,10 @@ import { decodeSingleResource, HTTP_CONNECTION_MANGER_TYPE_URL } from './resourc
import Duration = experimental.Duration;
import { Duration__Output } from './generated/google/protobuf/Duration';
import { createHttpFilter, HttpFilterConfig, parseOverrideFilterConfig, parseTopLevelFilterConfig } from './http-filter';
import { EXPERIMENTAL_FAULT_INJECTION } from './environment';
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_RETRY } from './environment';
import Filter = experimental.Filter;
import FilterFactory = experimental.FilterFactory;
import RetryPolicy = experimental.RetryPolicy;

const TRACER_NAME = 'xds_resolver';

Expand Down Expand Up @@ -199,6 +200,24 @@ function protoDurationToDuration(duration: Duration__Output): Duration {
}
}

function protoDurationToSecondsString(duration: Duration__Output): string {
return `${duration.seconds + duration.nanos / 1_000_000_000}s`;
}

const DEFAULT_RETRY_BASE_INTERVAL = '0.025s'

function getDefaultRetryMaxInterval(baseInterval: string): string {
return `${Number.parseFloat(baseInterval.substring(0, baseInterval.length - 1)) * 10}s`;
}

const RETRY_CODES: {[key: string]: status} = {
'cancelled': status.CANCELLED,
'deadline-exceeded': status.DEADLINE_EXCEEDED,
'internal': status.INTERNAL,
'resource-exhausted': status.RESOURCE_EXHAUSTED,
'unavailable': status.UNAVAILABLE
};

class XdsResolver implements Resolver {
private hasReportedSuccess = false;

Expand Down Expand Up @@ -363,6 +382,33 @@ class XdsResolver implements Resolver {
}
}
}
let retryPolicy: RetryPolicy | undefined = undefined;
if (EXPERIMENTAL_RETRY) {
const retryConfig = route.route!.retry_policy ?? virtualHost.retry_policy;
if (retryConfig) {
const retryableStatusCodes = [];
for (const code of retryConfig.retry_on.split(',')) {
if (RETRY_CODES[code]) {
retryableStatusCodes.push(RETRY_CODES[code]);
}
}
if (retryableStatusCodes.length > 0) {
const baseInterval = retryConfig.retry_back_off?.base_interval ?
protoDurationToSecondsString(retryConfig.retry_back_off.base_interval) :
DEFAULT_RETRY_BASE_INTERVAL;
const maxInterval = retryConfig.retry_back_off?.max_interval ?
protoDurationToSecondsString(retryConfig.retry_back_off.max_interval) :
getDefaultRetryMaxInterval(baseInterval);
retryPolicy = {
backoffMultiplier: 2,
initialBackoff: baseInterval,
maxBackoff: maxInterval,
maxAttempts: (retryConfig.num_retries?.value ?? 1) + 1,
retryableStatusCodes: retryableStatusCodes
};
}
}
}
switch (route.route!.cluster_specifier) {
case 'cluster_header':
continue;
Expand Down Expand Up @@ -390,7 +436,7 @@ class XdsResolver implements Resolver {
}
}
}
routeAction = new SingleClusterRouteAction(cluster, timeout, extraFilterFactories);
routeAction = new SingleClusterRouteAction(cluster, {name: [], timeout: timeout, retryPolicy: retryPolicy}, extraFilterFactories);
break;
}
case 'weighted_clusters': {
Expand Down Expand Up @@ -432,7 +478,7 @@ class XdsResolver implements Resolver {
}
weightedClusters.push({name: clusterWeight.name, weight: clusterWeight.weight?.value ?? 0, dynamicFilterFactories: extraFilterFactories});
}
routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, timeout);
routeAction = new WeightedClusterRouteAction(weightedClusters, route.route!.weighted_clusters!.total_weight?.value ?? 100, {name: [], timeout: timeout, retryPolicy: retryPolicy});
break;
}
default:
Expand Down Expand Up @@ -470,7 +516,7 @@ class XdsResolver implements Resolver {
this.unrefCluster(clusterResult.name);
}
return {
methodConfig: {name: [], timeout: action.getTimeout()},
methodConfig: clusterResult.methodConfig,
onCommitted: onCommitted,
pickInformation: {cluster: clusterResult.name},
status: status.OK,
Expand Down
31 changes: 9 additions & 22 deletions packages/grpc-js-xds/src/route-action.ts
Expand Up @@ -18,16 +18,17 @@ import { experimental } from '@grpc/grpc-js';
import Duration = experimental.Duration;
import Filter = experimental.Filter;
import FilterFactory = experimental.FilterFactory;
import MethodConfig = experimental.MethodConfig;

export interface ClusterResult {
name: string;
methodConfig: MethodConfig;
dynamicFilterFactories: FilterFactory<Filter>[];
}

export interface RouteAction {
toString(): string;
getCluster(): ClusterResult;
getTimeout(): Duration | undefined;
}

function durationToLogString(duration: Duration) {
Expand All @@ -40,25 +41,18 @@ function durationToLogString(duration: Duration) {
}

export class SingleClusterRouteAction implements RouteAction {
constructor(private cluster: string, private timeout: Duration | undefined, private extraFilterFactories: FilterFactory<Filter>[]) {}
constructor(private cluster: string, private methodConfig: MethodConfig, private extraFilterFactories: FilterFactory<Filter>[]) {}

getCluster() {
return {
name: this.cluster,
methodConfig: this.methodConfig,
dynamicFilterFactories: this.extraFilterFactories
};
}

toString() {
if (this.timeout) {
return 'SingleCluster(' + this.cluster + ', ' + 'timeout=' + durationToLogString(this.timeout) + 's)';
} else {
return 'SingleCluster(' + this.cluster + ')';
}
}

getTimeout() {
return this.timeout;
return 'SingleCluster(' + this.cluster + ', ' + JSON.stringify(this.methodConfig) + ')';
}
}

Expand All @@ -79,7 +73,7 @@ export class WeightedClusterRouteAction implements RouteAction {
* The weighted cluster choices represented as a CDF
*/
private clusterChoices: ClusterChoice[];
constructor(private clusters: WeightedCluster[], private totalWeight: number, private timeout: Duration | undefined) {
constructor(private clusters: WeightedCluster[], private totalWeight: number, private methodConfig: MethodConfig) {
this.clusterChoices = [];
let lastNumerator = 0;
for (const clusterWeight of clusters) {
Expand All @@ -94,24 +88,17 @@ export class WeightedClusterRouteAction implements RouteAction {
if (randomNumber < choice.numerator) {
return {
name: choice.name,
methodConfig: this.methodConfig,
dynamicFilterFactories: choice.dynamicFilterFactories
};
}
}
// This should be prevented by the validation rules
return {name: '', dynamicFilterFactories: []};
return {name: '', methodConfig: this.methodConfig, dynamicFilterFactories: []};
}

toString() {
const clusterListString = this.clusters.map(({name, weight}) => '(' + name + ':' + weight + ')').join(', ')
if (this.timeout) {
return 'WeightedCluster(' + clusterListString + ', ' + 'timeout=' + durationToLogString(this.timeout) + 's)';
} else {
return 'WeightedCluster(' + clusterListString + ')';
}
}

getTimeout() {
return this.timeout;
return 'WeightedCluster(' + clusterListString + ', ' + JSON.stringify(this.methodConfig) + ')';
}
}
43 changes: 42 additions & 1 deletion packages/grpc-js-xds/src/xds-stream-state/rds-state.ts
Expand Up @@ -15,8 +15,10 @@
*
*/

import { EXPERIMENTAL_FAULT_INJECTION } from "../environment";
import { EXPERIMENTAL_FAULT_INJECTION, EXPERIMENTAL_RETRY } from "../environment";
import { RetryPolicy__Output } from "../generated/envoy/config/route/v3/RetryPolicy";
import { RouteConfiguration__Output } from "../generated/envoy/config/route/v3/RouteConfiguration";
import { Duration__Output } from "../generated/google/protobuf/Duration";
import { validateOverrideFilter } from "../http-filter";
import { BaseXdsStreamState, XdsStreamState } from "./xds-stream-state";

Expand All @@ -30,6 +32,13 @@ const SUPPPORTED_HEADER_MATCH_SPECIFIERS = [
'suffix_match'];
const SUPPORTED_CLUSTER_SPECIFIERS = ['cluster', 'weighted_clusters', 'cluster_header'];

function durationToMs(duration: Duration__Output | null): number | null {
if (duration === null) {
return null;
}
return (Number.parseInt(duration.seconds) * 1000 + duration.nanos / 1_000_000) | 0;
}

export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> implements XdsStreamState<RouteConfiguration__Output> {
protected isStateOfTheWorld(): boolean {
return false;
Expand All @@ -40,6 +49,28 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
protected getProtocolName(): string {
return 'RDS';
}

private validateRetryPolicy(policy: RetryPolicy__Output | null): boolean {
if (policy === null) {
return true;
}
const numRetries = policy.num_retries?.value ?? 1
if (numRetries < 1) {
return false;
}
if (policy.retry_back_off) {
if (!policy.retry_back_off.base_interval) {
return false;
}
const baseInterval = durationToMs(policy.retry_back_off.base_interval)!;
const maxInterval = durationToMs(policy.retry_back_off.max_interval) ?? (10 * baseInterval);
if (!(maxInterval >= baseInterval) && (baseInterval > 0)) {
return false;
}
}
return true;
}

validateResponse(message: RouteConfiguration__Output): boolean {
// https://github.com/grpc/proposal/blob/master/A28-xds-traffic-splitting-and-routing.md#response-validation
for (const virtualHost of message.virtual_hosts) {
Expand All @@ -62,6 +93,11 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
}
}
}
if (EXPERIMENTAL_RETRY) {
if (!this.validateRetryPolicy(virtualHost.retry_policy)) {
return false;
}
}
for (const route of virtualHost.routes) {
const match = route.match;
if (!match) {
Expand All @@ -88,6 +124,11 @@ export class RdsState extends BaseXdsStreamState<RouteConfiguration__Output> imp
}
}
}
if (EXPERIMENTAL_RETRY) {
if (!this.validateRetryPolicy(route.route.retry_policy)) {
return false;
}
}
if (route.route!.cluster_specifier === 'weighted_clusters') {
if (route.route.weighted_clusters!.total_weight?.value === 0) {
return false;
Expand Down
2 changes: 1 addition & 1 deletion packages/grpc-js/src/experimental.ts
Expand Up @@ -7,7 +7,7 @@ export {
} from './resolver';
export { GrpcUri, uriToString } from './uri-parser';
export { Duration, durationToMs } from './duration';
export { ServiceConfig } from './service-config';
export { ServiceConfig, MethodConfig, RetryPolicy } from './service-config';
export { BackoffTimeout } from './backoff-timeout';
export {
LoadBalancer,
Expand Down

0 comments on commit 9214988

Please sign in to comment.