Skip to content

Commit

Permalink
Merge pull request #2278 from murgatroid99/grpc-js_retry2
Browse files Browse the repository at this point in the history
grpc-js: Implement retries
  • Loading branch information
murgatroid99 committed Nov 17, 2022
2 parents 01c5ee7 + fa21e13 commit 89e132a
Show file tree
Hide file tree
Showing 12 changed files with 1,506 additions and 28 deletions.
3 changes: 3 additions & 0 deletions packages/grpc-js/README.md
Expand Up @@ -59,6 +59,9 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`.
- `grpc.default_compression_algorithm`
- `grpc.enable_channelz`
- `grpc.dns_min_time_between_resolutions_ms`
- `grpc.enable_retries`
- `grpc.per_rpc_retry_buffer_size`
- `grpc.retry_buffer_size`
- `grpc-node.max_session_memory`
- `channelOverride`
- `channelFactoryOverride`
Expand Down
11 changes: 11 additions & 0 deletions packages/grpc-js/src/channel-options.ts
Expand Up @@ -44,6 +44,14 @@ export interface ChannelOptions {
'grpc.default_compression_algorithm'?: CompressionAlgorithms;
'grpc.enable_channelz'?: number;
'grpc.dns_min_time_between_resolutions_ms'?: number;
'grpc.enable_retries'?: number;
'grpc.per_rpc_retry_buffer_size'?: number;
/* This option is pattered like a core option, but the core does not have
* this option. It is closely related to the option
* grpc.per_rpc_retry_buffer_size, which is in the core. The core will likely
* implement this functionality using the ResourceQuota mechanism, so there
* will probably not be any collision or other inconsistency. */
'grpc.retry_buffer_size'?: number;
'grpc.max_connection_age_ms'?: number;
'grpc.max_connection_age_grace_ms'?: number;
'grpc-node.max_session_memory'?: number;
Expand Down Expand Up @@ -73,6 +81,9 @@ export const recognizedOptions = {
'grpc.enable_http_proxy': true,
'grpc.enable_channelz': true,
'grpc.dns_min_time_between_resolutions_ms': true,
'grpc.enable_retries': true,
'grpc.per_rpc_retry_buffer_size': true,
'grpc.retry_buffer_size': true,
'grpc.max_connection_age_ms': true,
'grpc.max_connection_age_grace_ms': true,
'grpc-node.max_session_memory': true,
Expand Down
45 changes: 42 additions & 3 deletions packages/grpc-js/src/internal-channel.ts
Expand Up @@ -50,6 +50,7 @@ import { Deadline, getDeadlineTimeoutString } from './deadline';
import { ResolvingCall } from './resolving-call';
import { getNextCallNumber } from './call-number';
import { restrictControlPlaneStatusCode } from './control-plane-status';
import { MessageBufferTracker, RetryingCall, RetryThrottler } from './retrying-call';

/**
* See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args
Expand Down Expand Up @@ -78,6 +79,11 @@ interface ErrorConfigResult {

type GetConfigResult = NoneConfigResult | SuccessConfigResult | ErrorConfigResult;

const RETRY_THROTTLER_MAP: Map<string, RetryThrottler> = new Map();

const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1<<24; // 16 MB
const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1<<20; // 1 MB

export class InternalChannel {

private resolvingLoadBalancer: ResolvingLoadBalancer;
Expand Down Expand Up @@ -111,6 +117,7 @@ export class InternalChannel {
* than TRANSIENT_FAILURE.
*/
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;

// Channelz info
private readonly channelzEnabled: boolean = true;
Expand Down Expand Up @@ -179,6 +186,10 @@ export class InternalChannel {
this.subchannelPool = getSubchannelPool(
(options['grpc.use_local_subchannel_pool'] ?? 0) === 0
);
this.retryBufferTracker = new MessageBufferTracker(
options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES,
options['grpc.per_rpc_retry_buffer_size'] ?? DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
);
const channelControlHelper: ChannelControlHelper = {
createSubchannel: (
subchannelAddress: SubchannelAddress,
Expand Down Expand Up @@ -226,7 +237,12 @@ export class InternalChannel {
this.target,
channelControlHelper,
options,
(configSelector) => {
(serviceConfig, configSelector) => {
if (serviceConfig.retryThrottling) {
RETRY_THROTTLER_MAP.set(this.getTarget(), new RetryThrottler(serviceConfig.retryThrottling.maxTokens, serviceConfig.retryThrottling.tokenRatio, RETRY_THROTTLER_MAP.get(this.getTarget())));
} else {
RETRY_THROTTLER_MAP.delete(this.getTarget());
}
if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded');
}
Expand All @@ -243,6 +259,7 @@ export class InternalChannel {
}
this.configSelectionQueue = [];
});

},
(status) => {
if (this.channelzEnabled) {
Expand Down Expand Up @@ -405,6 +422,24 @@ export class InternalChannel {
return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, callNumber);
}

createRetryingCall(
callConfig: CallConfig,
method: string,
host: string,
credentials: CallCredentials,
deadline: Deadline
): RetryingCall {
const callNumber = getNextCallNumber();
this.trace(
'createRetryingCall [' +
callNumber +
'] method="' +
method +
'"'
);
return new RetryingCall(this, callConfig, method, host, credentials, deadline, callNumber, this.retryBufferTracker, RETRY_THROTTLER_MAP.get(this.getTarget()))
}

createInnerCall(
callConfig: CallConfig,
method: string,
Expand All @@ -413,7 +448,11 @@ export class InternalChannel {
deadline: Deadline
): Call {
// Create a RetryingCall if retries are enabled
return this.createLoadBalancingCall(callConfig, method, host, credentials, deadline);
if (this.options['grpc.enable_retries'] === 0) {
return this.createLoadBalancingCall(callConfig, method, host, credentials, deadline);
} else {
return this.createRetryingCall(callConfig, method, host, credentials, deadline);
}
}

createResolvingCall(
Expand All @@ -439,7 +478,7 @@ export class InternalChannel {
parentCall: parentCall,
};

const call = new ResolvingCall(this, method, finalOptions, this.filterStackFactory.clone(), this.credentials._getCallCredentials(), getNextCallNumber());
const call = new ResolvingCall(this, method, finalOptions, this.filterStackFactory.clone(), this.credentials._getCallCredentials(), callNumber);

if (this.channelzEnabled) {
this.callTracker.addCallStarted();
Expand Down
16 changes: 14 additions & 2 deletions packages/grpc-js/src/load-balancing-call.ts
Expand Up @@ -29,6 +29,7 @@ import { CallConfig } from "./resolver";
import { splitHostPort } from "./uri-parser";
import * as logging from './logging';
import { restrictControlPlaneStatusCode } from "./control-plane-status";
import * as http2 from 'http2';

const TRACER_NAME = 'load_balancing_call';

Expand All @@ -38,6 +39,10 @@ export interface StatusObjectWithProgress extends StatusObject {
progress: RpcProgress;
}

export interface LoadBalancingCallInterceptingListener extends InterceptingListener {
onReceiveStatus(status: StatusObjectWithProgress): void;
}

export class LoadBalancingCall implements Call {
private child: SubchannelCall | null = null;
private readPending = false;
Expand Down Expand Up @@ -145,13 +150,20 @@ export class LoadBalancingCall implements Call {
try {
this.child = pickResult.subchannel!.getRealSubchannel().createCall(finalMetadata, this.host, this.methodName, {
onReceiveMetadata: metadata => {
this.trace('Received metadata');
this.listener!.onReceiveMetadata(metadata);
},
onReceiveMessage: message => {
this.trace('Received message');
this.listener!.onReceiveMessage(message);
},
onReceiveStatus: status => {
this.outputStatus(status, 'PROCESSED');
this.trace('Received status');
if (status.rstCode === http2.constants.NGHTTP2_REFUSED_STREAM) {
this.outputStatus(status, 'REFUSED');
} else {
this.outputStatus(status, 'PROCESSED');
}
}
});
} catch (error) {
Expand Down Expand Up @@ -226,7 +238,7 @@ export class LoadBalancingCall implements Call {
getPeer(): string {
return this.child?.getPeer() ?? this.channel.getTarget();
}
start(metadata: Metadata, listener: InterceptingListener): void {
start(metadata: Metadata, listener: LoadBalancingCallInterceptingListener): void {
this.trace('start called');
this.listener = listener;
this.metadata = metadata;
Expand Down
3 changes: 2 additions & 1 deletion packages/grpc-js/src/resolving-load-balancer.ts
Expand Up @@ -83,7 +83,7 @@ function getDefaultConfigSelector(
}

export interface ResolutionCallback {
(configSelector: ConfigSelector): void;
(serviceConfig: ServiceConfig, configSelector: ConfigSelector): void;
}

export interface ResolutionFailureCallback {
Expand Down Expand Up @@ -239,6 +239,7 @@ export class ResolvingLoadBalancer implements LoadBalancer {
const finalServiceConfig =
workingServiceConfig ?? this.defaultServiceConfig;
this.onSuccessfulResolution(
finalServiceConfig,
configSelector ?? getDefaultConfigSelector(finalServiceConfig)
);
},
Expand Down

0 comments on commit 89e132a

Please sign in to comment.