Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Implement retries #2278

Merged
merged 9 commits into from Nov 17, 2022
3 changes: 3 additions & 0 deletions packages/grpc-js/README.md
Expand Up @@ -59,6 +59,9 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`.
- `grpc.default_compression_algorithm`
- `grpc.enable_channelz`
- `grpc.dns_min_time_between_resolutions_ms`
- `grpc.enable_retries`
- `grpc.per_rpc_retry_buffer_size`
- `grpc.retry_buffer_size`
- `grpc-node.max_session_memory`
- `channelOverride`
- `channelFactoryOverride`
Expand Down
11 changes: 11 additions & 0 deletions packages/grpc-js/src/channel-options.ts
Expand Up @@ -44,6 +44,14 @@ export interface ChannelOptions {
'grpc.default_compression_algorithm'?: CompressionAlgorithms;
'grpc.enable_channelz'?: number;
'grpc.dns_min_time_between_resolutions_ms'?: number;
'grpc.enable_retries'?: number;
'grpc.per_rpc_retry_buffer_size'?: number;
/* This option is pattered like a core option, but the core does not have
* this option. It is closely related to the option
* grpc.per_rpc_retry_buffer_size, which is in the core. The core will likely
* implement this functionality using the ResourceQuota mechanism, so there
* will probably not be any collision or other inconsistency. */
'grpc.retry_buffer_size'?: number;
'grpc.max_connection_age_ms'?: number;
'grpc.max_connection_age_grace_ms'?: number;
'grpc-node.max_session_memory'?: number;
Expand Down Expand Up @@ -73,6 +81,9 @@ export const recognizedOptions = {
'grpc.enable_http_proxy': true,
'grpc.enable_channelz': true,
'grpc.dns_min_time_between_resolutions_ms': true,
'grpc.enable_retries': true,
'grpc.per_rpc_retry_buffer_size': true,
'grpc.retry_buffer_size': true,
'grpc.max_connection_age_ms': true,
'grpc.max_connection_age_grace_ms': true,
'grpc-node.max_session_memory': true,
Expand Down
45 changes: 42 additions & 3 deletions packages/grpc-js/src/internal-channel.ts
Expand Up @@ -50,6 +50,7 @@ import { Deadline, getDeadlineTimeoutString } from './deadline';
import { ResolvingCall } from './resolving-call';
import { getNextCallNumber } from './call-number';
import { restrictControlPlaneStatusCode } from './control-plane-status';
import { MessageBufferTracker, RetryingCall, RetryThrottler } from './retrying-call';

/**
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
Expand Down Expand Up @@ -78,6 +79,11 @@ interface ErrorConfigResult {

type GetConfigResult = NoneConfigResult | SuccessConfigResult | ErrorConfigResult;

const RETRY_THROTTLER_MAP: Map<string, RetryThrottler> = new Map();

const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1<<24; // 16 MB
const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1<<20; // 1 MB

export class InternalChannel {

private resolvingLoadBalancer: ResolvingLoadBalancer;
Expand Down Expand Up @@ -111,6 +117,7 @@ export class InternalChannel {
* than TRANSIENT_FAILURE.
*/
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;

// Channelz info
private readonly channelzEnabled: boolean = true;
Expand Down Expand Up @@ -179,6 +186,10 @@ export class InternalChannel {
this.subchannelPool = getSubchannelPool(
(options['grpc.use_local_subchannel_pool'] ?? 0) === 0
);
this.retryBufferTracker = new MessageBufferTracker(
options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES,
options['grpc.per_rpc_retry_buffer_size'] ?? DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
);
const channelControlHelper: ChannelControlHelper = {
createSubchannel: (
subchannelAddress: SubchannelAddress,
Expand Down Expand Up @@ -226,7 +237,12 @@ export class InternalChannel {
this.target,
channelControlHelper,
options,
(configSelector) => {
(serviceConfig, configSelector) => {
if (serviceConfig.retryThrottling) {
RETRY_THROTTLER_MAP.set(this.getTarget(), new RetryThrottler(serviceConfig.retryThrottling.maxTokens, serviceConfig.retryThrottling.tokenRatio, RETRY_THROTTLER_MAP.get(this.getTarget())));
} else {
RETRY_THROTTLER_MAP.delete(this.getTarget());
}
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
}
Expand All @@ -243,6 +259,7 @@ export class InternalChannel {
}
this.configSelectionQueue = [];
});

},
(status) => {
if (this.channelzEnabled) {
Expand Down Expand Up @@ -405,6 +422,24 @@ export class InternalChannel {
return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, callNumber);
}

createRetryingCall(
callConfig: CallConfig,
method: string,
host: string,
credentials: CallCredentials,
deadline: Deadline
): RetryingCall {
const callNumber = getNextCallNumber();
this.trace(
'createRetryingCall [' +
callNumber +
'] method="' +
method +
'"'
);
return new RetryingCall(this, callConfig, method, host, credentials, deadline, callNumber, this.retryBufferTracker, RETRY_THROTTLER_MAP.get(this.getTarget()))
}

createInnerCall(
callConfig: CallConfig,
method: string,
Expand All @@ -413,7 +448,11 @@ export class InternalChannel {
deadline: Deadline
): Call {
// Create a RetryingCall if retries are enabled
return this.createLoadBalancingCall(callConfig, method, host, credentials, deadline);
if (this.options['grpc.enable_retries'] === 0) {
return this.createLoadBalancingCall(callConfig, method, host, credentials, deadline);
} else {
return this.createRetryingCall(callConfig, method, host, credentials, deadline);
}
}

createResolvingCall(
Expand All @@ -439,7 +478,7 @@ export class InternalChannel {
parentCall: parentCall,
};

const call = new ResolvingCall(this, method, finalOptions, this.filterStackFactory.clone(), this.credentials._getCallCredentials(), getNextCallNumber());
const call = new ResolvingCall(this, method, finalOptions, this.filterStackFactory.clone(), this.credentials._getCallCredentials(), callNumber);

if (this.channelzEnabled) {
this.callTracker.addCallStarted();
Expand Down
16 changes: 14 additions & 2 deletions packages/grpc-js/src/load-balancing-call.ts
Expand Up @@ -29,6 +29,7 @@ import { CallConfig } from "./resolver";
import { splitHostPort } from "./uri-parser";
import * as logging from './logging';
import { restrictControlPlaneStatusCode } from "./control-plane-status";
import * as http2 from 'http2';

const TRACER_NAME = 'load_balancing_call';

Expand All @@ -38,6 +39,10 @@ export interface StatusObjectWithProgress extends StatusObject {
progress: RpcProgress;
}

export interface LoadBalancingCallInterceptingListener extends InterceptingListener {
onReceiveStatus(status: StatusObjectWithProgress): void;
}

export class LoadBalancingCall implements Call {
private child: SubchannelCall | null = null;
private readPending = false;
Expand Down Expand Up @@ -145,13 +150,20 @@ export class LoadBalancingCall implements Call {
try {
this.child = pickResult.subchannel!.getRealSubchannel().createCall(finalMetadata, this.host, this.methodName, {
onReceiveMetadata: metadata => {
this.trace('Received metadata');
this.listener!.onReceiveMetadata(metadata);
},
onReceiveMessage: message => {
this.trace('Received message');
this.listener!.onReceiveMessage(message);
},
onReceiveStatus: status => {
this.outputStatus(status, 'PROCESSED');
this.trace('Received status');
if (status.rstCode === http2.constants.NGHTTP2_REFUSED_STREAM) {
this.outputStatus(status, 'REFUSED');
} else {
this.outputStatus(status, 'PROCESSED');
}
}
});
} catch (error) {
Expand Down Expand Up @@ -226,7 +238,7 @@ export class LoadBalancingCall implements Call {
getPeer(): string {
return this.child?.getPeer() ?? this.channel.getTarget();
}
start(metadata: Metadata, listener: InterceptingListener): void {
start(metadata: Metadata, listener: LoadBalancingCallInterceptingListener): void {
this.trace('start called');
this.listener = listener;
this.metadata = metadata;
Expand Down
3 changes: 2 additions & 1 deletion packages/grpc-js/src/resolving-load-balancer.ts
Expand Up @@ -83,7 +83,7 @@ function getDefaultConfigSelector(
}

export interface ResolutionCallback {
(configSelector: ConfigSelector): void;
(serviceConfig: ServiceConfig, configSelector: ConfigSelector): void;
}

export interface ResolutionFailureCallback {
Expand Down Expand Up @@ -239,6 +239,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
const finalServiceConfig =
workingServiceConfig ?? this.defaultServiceConfig;
this.onSuccessfulResolution(
finalServiceConfig,
configSelector ?? getDefaultConfigSelector(finalServiceConfig)
);
},
Expand Down