diff --git a/packages/grpc-js/README.md b/packages/grpc-js/README.md index 11a8ca9aa..3d698dfd8 100644 --- a/packages/grpc-js/README.md +++ b/packages/grpc-js/README.md @@ -59,6 +59,9 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`. - `grpc.default_compression_algorithm` - `grpc.enable_channelz` - `grpc.dns_min_time_between_resolutions_ms` + - `grpc.enable_retries` + - `grpc.per_rpc_retry_buffer_size` + - `grpc.retry_buffer_size` - `grpc-node.max_session_memory` - `channelOverride` - `channelFactoryOverride` diff --git a/packages/grpc-js/src/channel-options.ts b/packages/grpc-js/src/channel-options.ts index c05253e9b..8830ed43d 100644 --- a/packages/grpc-js/src/channel-options.ts +++ b/packages/grpc-js/src/channel-options.ts @@ -44,6 +44,14 @@ export interface ChannelOptions { 'grpc.default_compression_algorithm'?: CompressionAlgorithms; 'grpc.enable_channelz'?: number; 'grpc.dns_min_time_between_resolutions_ms'?: number; + 'grpc.enable_retries'?: number; + 'grpc.per_rpc_retry_buffer_size'?: number; + /* This option is pattered like a core option, but the core does not have + * this option. It is closely related to the option + * grpc.per_rpc_retry_buffer_size, which is in the core. The core will likely + * implement this functionality using the ResourceQuota mechanism, so there + * will probably not be any collision or other inconsistency. */ + 'grpc.retry_buffer_size'?: number; 'grpc.max_connection_age_ms'?: number; 'grpc.max_connection_age_grace_ms'?: number; 'grpc-node.max_session_memory'?: number; @@ -73,6 +81,9 @@ export const recognizedOptions = { 'grpc.enable_http_proxy': true, 'grpc.enable_channelz': true, 'grpc.dns_min_time_between_resolutions_ms': true, + 'grpc.enable_retries': true, + 'grpc.per_rpc_retry_buffer_size': true, + 'grpc.retry_buffer_size': true, 'grpc.max_connection_age_ms': true, 'grpc.max_connection_age_grace_ms': true, 'grpc-node.max_session_memory': true, diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index a8c6dc964..9137be272 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -50,6 +50,7 @@ import { Deadline, getDeadlineTimeoutString } from './deadline'; import { ResolvingCall } from './resolving-call'; import { getNextCallNumber } from './call-number'; import { restrictControlPlaneStatusCode } from './control-plane-status'; +import { MessageBufferTracker, RetryingCall, RetryThrottler } from './retrying-call'; /** * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args @@ -78,6 +79,11 @@ interface ErrorConfigResult { type GetConfigResult = NoneConfigResult | SuccessConfigResult | ErrorConfigResult; +const RETRY_THROTTLER_MAP: Map = new Map(); + +const DEFAULT_RETRY_BUFFER_SIZE_BYTES = 1<<24; // 16 MB +const DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES = 1<<20; // 1 MB + export class InternalChannel { private resolvingLoadBalancer: ResolvingLoadBalancer; @@ -111,6 +117,7 @@ export class InternalChannel { * than TRANSIENT_FAILURE. */ private currentResolutionError: StatusObject | null = null; + private retryBufferTracker: MessageBufferTracker; // Channelz info private readonly channelzEnabled: boolean = true; @@ -179,6 +186,10 @@ export class InternalChannel { this.subchannelPool = getSubchannelPool( (options['grpc.use_local_subchannel_pool'] ?? 0) === 0 ); + this.retryBufferTracker = new MessageBufferTracker( + options['grpc.retry_buffer_size'] ?? DEFAULT_RETRY_BUFFER_SIZE_BYTES, + options['grpc.per_rpc_retry_buffer_size'] ?? DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES + ); const channelControlHelper: ChannelControlHelper = { createSubchannel: ( subchannelAddress: SubchannelAddress, @@ -226,7 +237,12 @@ export class InternalChannel { this.target, channelControlHelper, options, - (configSelector) => { + (serviceConfig, configSelector) => { + if (serviceConfig.retryThrottling) { + RETRY_THROTTLER_MAP.set(this.getTarget(), new RetryThrottler(serviceConfig.retryThrottling.maxTokens, serviceConfig.retryThrottling.tokenRatio, RETRY_THROTTLER_MAP.get(this.getTarget()))); + } else { + RETRY_THROTTLER_MAP.delete(this.getTarget()); + } if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded'); } @@ -243,6 +259,7 @@ export class InternalChannel { } this.configSelectionQueue = []; }); + }, (status) => { if (this.channelzEnabled) { @@ -405,6 +422,24 @@ export class InternalChannel { return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, callNumber); } + createRetryingCall( + callConfig: CallConfig, + method: string, + host: string, + credentials: CallCredentials, + deadline: Deadline + ): RetryingCall { + const callNumber = getNextCallNumber(); + this.trace( + 'createRetryingCall [' + + callNumber + + '] method="' + + method + + '"' + ); + return new RetryingCall(this, callConfig, method, host, credentials, deadline, callNumber, this.retryBufferTracker, RETRY_THROTTLER_MAP.get(this.getTarget())) + } + createInnerCall( callConfig: CallConfig, method: string, @@ -413,7 +448,11 @@ export class InternalChannel { deadline: Deadline ): Call { // Create a RetryingCall if retries are enabled - return this.createLoadBalancingCall(callConfig, method, host, credentials, deadline); + if (this.options['grpc.enable_retries'] === 0) { + return this.createLoadBalancingCall(callConfig, method, host, credentials, deadline); + } else { + return this.createRetryingCall(callConfig, method, host, credentials, deadline); + } } createResolvingCall( @@ -439,7 +478,7 @@ export class InternalChannel { parentCall: parentCall, }; - const call = new ResolvingCall(this, method, finalOptions, this.filterStackFactory.clone(), this.credentials._getCallCredentials(), getNextCallNumber()); + const call = new ResolvingCall(this, method, finalOptions, this.filterStackFactory.clone(), this.credentials._getCallCredentials(), callNumber); if (this.channelzEnabled) { this.callTracker.addCallStarted(); diff --git a/packages/grpc-js/src/load-balancing-call.ts b/packages/grpc-js/src/load-balancing-call.ts index 6faa15592..48aaf48ac 100644 --- a/packages/grpc-js/src/load-balancing-call.ts +++ b/packages/grpc-js/src/load-balancing-call.ts @@ -29,6 +29,7 @@ import { CallConfig } from "./resolver"; import { splitHostPort } from "./uri-parser"; import * as logging from './logging'; import { restrictControlPlaneStatusCode } from "./control-plane-status"; +import * as http2 from 'http2'; const TRACER_NAME = 'load_balancing_call'; @@ -38,6 +39,10 @@ export interface StatusObjectWithProgress extends StatusObject { progress: RpcProgress; } +export interface LoadBalancingCallInterceptingListener extends InterceptingListener { + onReceiveStatus(status: StatusObjectWithProgress): void; +} + export class LoadBalancingCall implements Call { private child: SubchannelCall | null = null; private readPending = false; @@ -145,13 +150,20 @@ export class LoadBalancingCall implements Call { try { this.child = pickResult.subchannel!.getRealSubchannel().createCall(finalMetadata, this.host, this.methodName, { onReceiveMetadata: metadata => { + this.trace('Received metadata'); this.listener!.onReceiveMetadata(metadata); }, onReceiveMessage: message => { + this.trace('Received message'); this.listener!.onReceiveMessage(message); }, onReceiveStatus: status => { - this.outputStatus(status, 'PROCESSED'); + this.trace('Received status'); + if (status.rstCode === http2.constants.NGHTTP2_REFUSED_STREAM) { + this.outputStatus(status, 'REFUSED'); + } else { + this.outputStatus(status, 'PROCESSED'); + } } }); } catch (error) { @@ -226,7 +238,7 @@ export class LoadBalancingCall implements Call { getPeer(): string { return this.child?.getPeer() ?? this.channel.getTarget(); } - start(metadata: Metadata, listener: InterceptingListener): void { + start(metadata: Metadata, listener: LoadBalancingCallInterceptingListener): void { this.trace('start called'); this.listener = listener; this.metadata = metadata; diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index da097cc42..a39606f2c 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -83,7 +83,7 @@ function getDefaultConfigSelector( } export interface ResolutionCallback { - (configSelector: ConfigSelector): void; + (serviceConfig: ServiceConfig, configSelector: ConfigSelector): void; } export interface ResolutionFailureCallback { @@ -239,6 +239,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { const finalServiceConfig = workingServiceConfig ?? this.defaultServiceConfig; this.onSuccessfulResolution( + finalServiceConfig, configSelector ?? getDefaultConfigSelector(finalServiceConfig) ); }, diff --git a/packages/grpc-js/src/retrying-call.ts b/packages/grpc-js/src/retrying-call.ts new file mode 100644 index 000000000..f9bda9eb5 --- /dev/null +++ b/packages/grpc-js/src/retrying-call.ts @@ -0,0 +1,639 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { CallCredentials } from "./call-credentials"; +import { LogVerbosity, Status } from "./constants"; +import { Deadline } from "./deadline"; +import { Metadata } from "./metadata"; +import { CallConfig } from "./resolver"; +import * as logging from './logging'; +import { Call, InterceptingListener, MessageContext, StatusObject, WriteCallback, WriteObject } from "./call-interface"; +import { LoadBalancingCall, StatusObjectWithProgress } from "./load-balancing-call"; +import { InternalChannel } from "./internal-channel"; + +const TRACER_NAME = 'retrying_call'; + +export class RetryThrottler { + private tokens: number; + constructor(private readonly maxTokens: number, private readonly tokenRatio: number, previousRetryThrottler?: RetryThrottler) { + if (previousRetryThrottler) { + /* When carrying over tokens from a previous config, rescale them to the + * new max value */ + this.tokens = previousRetryThrottler.tokens * (maxTokens / previousRetryThrottler.maxTokens); + } else { + this.tokens = maxTokens; + } + } + + addCallSucceeded() { + this.tokens = Math.max(this.tokens + this.tokenRatio, this.maxTokens); + } + + addCallFailed() { + this.tokens = Math.min(this.tokens - 1, 0); + } + + canRetryCall() { + return this.tokens > this.maxTokens / 2; + } +} + +export class MessageBufferTracker { + private totalAllocated: number = 0; + private allocatedPerCall: Map = new Map(); + + constructor(private totalLimit: number, private limitPerCall: number) {} + + allocate(size: number, callId: number): boolean { + const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; + if (this.limitPerCall - currentPerCall < size || this.totalLimit - this.totalAllocated < size) { + return false; + } + this.allocatedPerCall.set(callId, currentPerCall + size); + this.totalAllocated += size; + return true; + } + + free(size: number, callId: number) { + if (this.totalAllocated < size) { + throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > total allocated ${this.totalAllocated}`); + } + this.totalAllocated -= size; + const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; + if (currentPerCall < size) { + throw new Error(`Invalid buffer allocation state: call ${callId} freed ${size} > allocated for call ${currentPerCall}`); + } + this.allocatedPerCall.set(callId, currentPerCall - size); + } + + freeAll(callId: number) { + const currentPerCall = this.allocatedPerCall.get(callId) ?? 0; + if (this.totalAllocated < currentPerCall) { + throw new Error(`Invalid buffer allocation state: call ${callId} allocated ${currentPerCall} > total allocated ${this.totalAllocated}`); + } + this.totalAllocated -= currentPerCall; + this.allocatedPerCall.delete(callId); + } +} + +type UnderlyingCallState = 'ACTIVE' | 'COMPLETED'; + +interface UnderlyingCall { + state: UnderlyingCallState; + call: LoadBalancingCall; + nextMessageToSend: number; +} + +/** + * A retrying call can be in one of these states: + * RETRY: Retries are configured and new attempts may be sent + * HEDGING: Hedging is configured and new attempts may be sent + * TRANSPARENT_ONLY: Neither retries nor hedging are configured, and + * transparent retry attempts may still be sent + * COMMITTED: One attempt is committed, and no new attempts will be + * sent + */ +type RetryingCallState = 'RETRY' | 'HEDGING' | 'TRANSPARENT_ONLY' | 'COMMITTED'; + +/** + * The different types of objects that can be stored in the write buffer, with + * the following meanings: + * MESSAGE: This is a message to be sent. + * HALF_CLOSE: When this entry is reached, the calls should send a half-close. + * FREED: This slot previously contained a message that has been sent on all + * child calls and is no longer needed. + */ +type WriteBufferEntryType = 'MESSAGE' | 'HALF_CLOSE' | 'FREED'; + +/** + * Entry in the buffer of messages to send to the remote end. + */ +interface WriteBufferEntry { + entryType: WriteBufferEntryType; + /** + * Message to send. + * Only populated if entryType is MESSAGE. + */ + message?: WriteObject; + /** + * Callback to call after sending the message. + * Only populated if entryType is MESSAGE and the call is in the COMMITTED + * state. + */ + callback?: WriteCallback; + /** + * Indicates whether the message is allocated in the buffer tracker. Ignored + * if entryType is not MESSAGE. Should be the return value of + * bufferTracker.allocate. + */ + allocated: boolean; +} + +const PREVIONS_RPC_ATTEMPTS_METADATA_KEY = 'grpc-previous-rpc-attempts'; + +export class RetryingCall implements Call { + private state: RetryingCallState; + private listener: InterceptingListener | null = null; + private initialMetadata: Metadata | null = null; + private underlyingCalls: UnderlyingCall[] = []; + private writeBuffer: WriteBufferEntry[] = []; + /** + * Tracks whether a read has been started, so that we know whether to start + * reads on new child calls. This only matters for the first read, because + * once a message comes in the child call becomes committed and there will + * be no new child calls. + */ + private readStarted = false; + private transparentRetryUsed: boolean = false; + /** + * Number of attempts so far + */ + private attempts: number = 0; + private hedgingTimer: NodeJS.Timer | null = null; + private committedCallIndex: number | null = null; + private initialRetryBackoffSec = 0; + private nextRetryBackoffSec = 0; + constructor( + private readonly channel: InternalChannel, + private readonly callConfig: CallConfig, + private readonly methodName: string, + private readonly host: string, + private readonly credentials: CallCredentials, + private readonly deadline: Deadline, + private readonly callNumber: number, + private readonly bufferTracker: MessageBufferTracker, + private readonly retryThrottler?: RetryThrottler + ) { + if (callConfig.methodConfig.retryPolicy) { + this.state = 'RETRY'; + const retryPolicy = callConfig.methodConfig.retryPolicy; + this.nextRetryBackoffSec = this.initialRetryBackoffSec = Number(retryPolicy.initialBackoff.substring(0, retryPolicy.initialBackoff.length - 1)); + } else if (callConfig.methodConfig.hedgingPolicy) { + this.state = 'HEDGING'; + } else { + this.state = 'TRANSPARENT_ONLY'; + } + } + getCallNumber(): number { + return this.callNumber; + } + + private trace(text: string): void { + logging.trace( + LogVerbosity.DEBUG, + TRACER_NAME, + '[' + this.callNumber + '] ' + text + ); + } + + private reportStatus(statusObject: StatusObject) { + this.trace('ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"'); + process.nextTick(() => { + this.listener?.onReceiveStatus(statusObject); + }); + } + + cancelWithStatus(status: Status, details: string): void { + this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"'); + this.reportStatus({code: status, details, metadata: new Metadata()}); + for (const {call} of this.underlyingCalls) { + call.cancelWithStatus(status, details); + } + } + getPeer(): string { + if (this.committedCallIndex !== null) { + return this.underlyingCalls[this.committedCallIndex].call.getPeer(); + } else { + return 'unknown'; + } + } + + private maybefreeMessageBufferEntry(messageIndex: number) { + if (this.state !== 'COMMITTED') { + return; + } + const bufferEntry = this.writeBuffer[messageIndex]; + if (bufferEntry.entryType === 'MESSAGE') { + if (bufferEntry.allocated) { + this.bufferTracker.free(bufferEntry.message!.message.length, this.callNumber); + } + this.writeBuffer[messageIndex] = { + entryType: 'FREED', + allocated: false + }; + } + } + + private commitCall(index: number) { + if (this.state === 'COMMITTED') { + return; + } + if (this.underlyingCalls[index].state === 'COMPLETED') { + return; + } + this.trace('Committing call [' + this.underlyingCalls[index].call.getCallNumber() + '] at index ' + index); + this.state = 'COMMITTED'; + this.committedCallIndex = index; + for (let i = 0; i < this.underlyingCalls.length; i++) { + if (i === index) { + continue; + } + if (this.underlyingCalls[i].state === 'COMPLETED') { + continue; + } + this.underlyingCalls[i].state = 'COMPLETED'; + this.underlyingCalls[i].call.cancelWithStatus(Status.CANCELLED, 'Discarded in favor of other hedged attempt'); + } + for (let messageIndex = 0; messageIndex < this.underlyingCalls[index].nextMessageToSend - 1; messageIndex += 1) { + this.maybefreeMessageBufferEntry(messageIndex); + } + } + + private commitCallWithMostMessages() { + let mostMessages = -1; + let callWithMostMessages = -1; + for (const [index, childCall] of this.underlyingCalls.entries()) { + if (childCall.nextMessageToSend > mostMessages) { + mostMessages = childCall.nextMessageToSend; + callWithMostMessages = index; + } + } + this.commitCall(callWithMostMessages); + } + + private isStatusCodeInList(list: (Status | string)[], code: Status) { + return list.some((value => value === code || value.toString().toLowerCase() === Status[code].toLowerCase())); + } + + private getNextRetryBackoffMs() { + const retryPolicy = this.callConfig?.methodConfig.retryPolicy; + if (!retryPolicy) { + return 0; + } + const nextBackoffMs = Math.random() * this.nextRetryBackoffSec * 1000; + const maxBackoffSec = Number(retryPolicy.maxBackoff.substring(0, retryPolicy.maxBackoff.length - 1)); + this.nextRetryBackoffSec = Math.min(this.nextRetryBackoffSec * retryPolicy.backoffMultiplier, maxBackoffSec); + return nextBackoffMs + } + + private maybeRetryCall(pushback: number | null, callback: (retried: boolean) => void) { + if (this.state !== 'RETRY') { + callback(false); + return; + } + const retryPolicy = this.callConfig!.methodConfig.retryPolicy!; + if (this.attempts >= Math.min(retryPolicy.maxAttempts, 5)) { + callback(false); + return; + } + let retryDelayMs: number; + if (pushback === null) { + retryDelayMs = this.getNextRetryBackoffMs(); + } else if (pushback < 0) { + this.state = 'TRANSPARENT_ONLY'; + callback(false); + return; + } else { + retryDelayMs = pushback; + this.nextRetryBackoffSec = this.initialRetryBackoffSec; + } + setTimeout(() => { + if (this.state !== 'RETRY') { + callback(false); + return; + } + if (this.retryThrottler?.canRetryCall() ?? true) { + callback(true); + this.attempts += 1; + this.startNewAttempt(); + } + }, retryDelayMs); + } + + private countActiveCalls(): number { + let count = 0; + for (const call of this.underlyingCalls) { + if (call?.state === 'ACTIVE') { + count += 1; + } + } + return count; + } + + private handleProcessedStatus(status: StatusObject, callIndex: number, pushback: number | null) { + switch (this.state) { + case 'COMMITTED': + case 'TRANSPARENT_ONLY': + this.commitCall(callIndex); + this.reportStatus(status); + break; + case 'HEDGING': + if (this.isStatusCodeInList(this.callConfig!.methodConfig.hedgingPolicy!.nonFatalStatusCodes ?? [], status.code)) { + this.retryThrottler?.addCallFailed(); + let delayMs: number; + if (pushback === null) { + delayMs = 0; + } else if (pushback < 0) { + this.state = 'TRANSPARENT_ONLY'; + this.commitCall(callIndex); + this.reportStatus(status); + return; + } else { + delayMs = pushback; + } + setTimeout(() => { + this.maybeStartHedgingAttempt(); + // If after trying to start a call there are no active calls, this was the last one + if (this.countActiveCalls() === 0) { + this.commitCall(callIndex); + this.reportStatus(status); + } + }, delayMs); + } else { + this.commitCall(callIndex); + this.reportStatus(status); + } + break; + case 'RETRY': + if (this.isStatusCodeInList(this.callConfig!.methodConfig.retryPolicy!.retryableStatusCodes, status.code)) { + this.retryThrottler?.addCallFailed(); + this.maybeRetryCall(pushback, (retried) => { + if (!retried) { + this.commitCall(callIndex); + this.reportStatus(status); + } + }); + } else { + this.commitCall(callIndex); + this.reportStatus(status); + } + break; + } + } + + private getPushback(metadata: Metadata): number | null { + const mdValue = metadata.get('grpc-retry-pushback-ms'); + if (mdValue.length === 0) { + return null; + } + try { + return parseInt(mdValue[0] as string); + } catch (e) { + return -1; + } + } + + private handleChildStatus(status: StatusObjectWithProgress, callIndex: number) { + if (this.underlyingCalls[callIndex].state === 'COMPLETED') { + return; + } + this.trace('state=' + this.state + ' handling status with progress ' + status.progress + ' from child [' + this.underlyingCalls[callIndex].call.getCallNumber() + '] in state ' + this.underlyingCalls[callIndex].state); + this.underlyingCalls[callIndex].state = 'COMPLETED'; + if (status.code === Status.OK) { + this.retryThrottler?.addCallSucceeded(); + this.commitCall(callIndex); + this.reportStatus(status); + return; + } + if (this.state === 'COMMITTED') { + this.reportStatus(status); + return; + } + const pushback = this.getPushback(status.metadata); + switch (status.progress) { + case 'NOT_STARTED': + // RPC never leaves the client, always safe to retry + this.startNewAttempt(); + break; + case 'REFUSED': + // RPC reaches the server library, but not the server application logic + if (this.transparentRetryUsed) { + this.handleProcessedStatus(status, callIndex, pushback); + } else { + this.transparentRetryUsed = true; + this.startNewAttempt(); + }; + break; + case 'DROP': + this.commitCall(callIndex); + this.reportStatus(status); + break; + case 'PROCESSED': + this.handleProcessedStatus(status, callIndex, pushback); + break; + } + } + + private maybeStartHedgingAttempt() { + if (this.state !== 'HEDGING') { + return; + } + if (!this.callConfig.methodConfig.hedgingPolicy) { + return; + } + const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy; + if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) { + return; + } + this.attempts += 1; + this.startNewAttempt(); + this.maybeStartHedgingTimer(); + } + + private maybeStartHedgingTimer() { + if (this.hedgingTimer) { + clearTimeout(this.hedgingTimer); + } + if (this.state !== 'HEDGING') { + return; + } + if (!this.callConfig.methodConfig.hedgingPolicy) { + return; + } + const hedgingPolicy = this.callConfig.methodConfig.hedgingPolicy; + if (this.attempts >= Math.min(hedgingPolicy.maxAttempts, 5)) { + return; + } + const hedgingDelayString = hedgingPolicy.hedgingDelay ?? '0s'; + const hedgingDelaySec = Number(hedgingDelayString.substring(0, hedgingDelayString.length - 1)); + this.hedgingTimer = setTimeout(() => { + this.maybeStartHedgingAttempt(); + }, hedgingDelaySec * 1000); + this.hedgingTimer.unref?.(); + } + + private startNewAttempt() { + const child = this.channel.createLoadBalancingCall(this.callConfig, this.methodName, this.host, this.credentials, this.deadline); + this.trace('Created child call [' + child.getCallNumber() + '] for attempt ' + this.attempts); + const index = this.underlyingCalls.length; + this.underlyingCalls.push({state: 'ACTIVE', call: child, nextMessageToSend: 0}); + const previousAttempts = this.attempts - 1; + const initialMetadata = this.initialMetadata!.clone(); + if (previousAttempts > 0) { + initialMetadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`); + } + let receivedMetadata = false; + child.start(initialMetadata, { + onReceiveMetadata: metadata => { + this.trace('Received metadata from child [' + child.getCallNumber() + ']'); + this.commitCall(index); + receivedMetadata = true; + if (previousAttempts > 0) { + metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`); + } + if (this.underlyingCalls[index].state === 'ACTIVE') { + this.listener!.onReceiveMetadata(metadata); + } + }, + onReceiveMessage: message => { + this.trace('Received message from child [' + child.getCallNumber() + ']'); + this.commitCall(index); + if (this.underlyingCalls[index].state === 'ACTIVE') { + this.listener!.onReceiveMessage(message); + } + }, + onReceiveStatus: status => { + this.trace('Received status from child [' + child.getCallNumber() + ']'); + if (!receivedMetadata && previousAttempts > 0) { + status.metadata.set(PREVIONS_RPC_ATTEMPTS_METADATA_KEY, `${previousAttempts}`); + } + this.handleChildStatus(status, index); + } + }); + this.sendNextChildMessage(index); + if (this.readStarted) { + child.startRead(); + } + } + + start(metadata: Metadata, listener: InterceptingListener): void { + this.trace('start called'); + this.listener = listener; + this.initialMetadata = metadata; + this.attempts += 1; + this.startNewAttempt(); + this.maybeStartHedgingTimer(); + } + + private handleChildWriteCompleted(childIndex: number) { + const childCall = this.underlyingCalls[childIndex]; + const messageIndex = childCall.nextMessageToSend; + this.writeBuffer[messageIndex].callback?.(); + this.maybefreeMessageBufferEntry(messageIndex); + childCall.nextMessageToSend += 1; + this.sendNextChildMessage(childIndex); + } + + private sendNextChildMessage(childIndex: number) { + const childCall = this.underlyingCalls[childIndex]; + if (childCall.state === 'COMPLETED') { + return; + } + if (this.writeBuffer[childCall.nextMessageToSend]) { + const bufferEntry = this.writeBuffer[childCall.nextMessageToSend]; + switch (bufferEntry.entryType) { + case 'MESSAGE': + childCall.call.sendMessageWithContext({ + callback: (error) => { + // Ignore error + this.handleChildWriteCompleted(childIndex); + } + }, bufferEntry.message!.message); + break; + case 'HALF_CLOSE': + childCall.nextMessageToSend += 1; + childCall.call.halfClose(); + break; + case 'FREED': + // Should not be possible + break; + } + } + } + + sendMessageWithContext(context: MessageContext, message: Buffer): void { + this.trace('write() called with message of length ' + message.length); + const writeObj: WriteObject = { + message, + flags: context.flags, + }; + const messageIndex = this.writeBuffer.length; + const bufferEntry: WriteBufferEntry = { + entryType: 'MESSAGE', + message: writeObj, + allocated: this.bufferTracker.allocate(message.length, this.callNumber) + }; + this.writeBuffer[messageIndex] = bufferEntry; + if (bufferEntry.allocated) { + context.callback?.(); + for (const [callIndex, call] of this.underlyingCalls.entries()) { + if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) { + call.call.sendMessageWithContext({ + callback: (error) => { + // Ignore error + this.handleChildWriteCompleted(callIndex); + } + }, message); + } + } + } else { + this.commitCallWithMostMessages(); + const call = this.underlyingCalls[this.committedCallIndex!]; + bufferEntry.callback = context.callback; + if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) { + call.call.sendMessageWithContext({ + callback: (error) => { + // Ignore error + this.handleChildWriteCompleted(this.committedCallIndex!); + } + }, message); + } + } + } + startRead(): void { + this.trace('startRead called'); + this.readStarted = true; + for (const underlyingCall of this.underlyingCalls) { + if (underlyingCall?.state === 'ACTIVE') { + underlyingCall.call.startRead(); + } + } + } + halfClose(): void { + this.trace('halfClose called'); + const halfCloseIndex = this.writeBuffer.length; + this.writeBuffer[halfCloseIndex] = { + entryType: 'HALF_CLOSE', + allocated: false + }; + for (const call of this.underlyingCalls) { + if (call?.state === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) { + call.nextMessageToSend += 1; + call.call.halfClose(); + } + } + } + setCredentials(newCredentials: CallCredentials): void { + throw new Error("Method not implemented."); + } + getMethod(): string { + return this.methodName; + } + getHost(): string { + return this.host; + } +} \ No newline at end of file diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index a000b8ffe..631ec7675 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -709,20 +709,29 @@ export class Http2ServerCallStream< if (this.deadlineTimer) clearTimeout(this.deadlineTimer); - if (!this.wantTrailers) { - this.wantTrailers = true; - this.stream.once('wantTrailers', () => { - const trailersToSend = { - [GRPC_STATUS_HEADER]: statusObj.code, - [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details), - ...statusObj.metadata?.toHttp2Headers(), - }; - - this.stream.sendTrailers(trailersToSend); - this.statusSent = true; - }); - this.sendMetadata(); - this.stream.end(); + if (this.stream.headersSent) { + if (!this.wantTrailers) { + this.wantTrailers = true; + this.stream.once('wantTrailers', () => { + const trailersToSend = { + [GRPC_STATUS_HEADER]: statusObj.code, + [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details), + ...statusObj.metadata?.toHttp2Headers(), + }; + + this.stream.sendTrailers(trailersToSend); + this.statusSent = true; + }); + this.stream.end(); + } + } else { + const trailersToSend = { + [GRPC_STATUS_HEADER]: statusObj.code, + [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details), + ...statusObj.metadata?.toHttp2Headers(), + }; + this.stream.respond(trailersToSend, {endStream: true}); + this.statusSent = true; } } diff --git a/packages/grpc-js/src/service-config.ts b/packages/grpc-js/src/service-config.ts index 12802dad8..201c0c648 100644 --- a/packages/grpc-js/src/service-config.ts +++ b/packages/grpc-js/src/service-config.ts @@ -50,7 +50,7 @@ export interface RetryPolicy { export interface HedgingPolicy { maxAttempts: number; hedgingDelay?: string; - nonFatalStatusCodes: (Status | string)[]; + nonFatalStatusCodes?: (Status | string)[]; } export interface MethodConfig { @@ -86,7 +86,7 @@ export interface ServiceConfigCanaryConfig { * Recognizes a number with up to 9 digits after the decimal point, followed by * an "s", representing a number of seconds. */ -const TIMEOUT_REGEX = /^\d+(\.\d{1,9})?s$/; +const DURATION_REGEX = /^\d+(\.\d{1,9})?s$/; /** * Client language name used for determining whether this client matches a @@ -111,6 +111,81 @@ function validateName(obj: any): MethodConfigName { return result; } +function validateRetryPolicy(obj: any): RetryPolicy { + if (!('maxAttempts' in obj) || !Number.isInteger(obj.maxAttempts) || obj.maxAttempts < 2) { + throw new Error('Invalid method config retry policy: maxAttempts must be an integer at least 2'); + } + if (!('initialBackoff' in obj) || typeof obj.initialBackoff !== 'string' || !DURATION_REGEX.test(obj.initialBackoff)) { + throw new Error('Invalid method config retry policy: initialBackoff must be a string consisting of a positive integer followed by s'); + } + if (!('maxBackoff' in obj) || typeof obj.maxBackoff !== 'string' || !DURATION_REGEX.test(obj.maxBackoff)) { + throw new Error('Invalid method config retry policy: maxBackoff must be a string consisting of a positive integer followed by s'); + } + if (!('backoffMultiplier' in obj) || typeof obj.backoffMultiplier !== 'number' || obj.backoffMultiplier <= 0) { + throw new Error('Invalid method config retry policy: backoffMultiplier must be a number greater than 0'); + } + if (!(('retryableStatusCodes' in obj) && Array.isArray(obj.retryableStatusCodes))) { + throw new Error('Invalid method config retry policy: retryableStatusCodes is required'); + } + if (obj.retryableStatusCodes.length === 0) { + throw new Error('Invalid method config retry policy: retryableStatusCodes must be non-empty'); + } + for (const value of obj.retryableStatusCodes) { + if (typeof value === 'number') { + if (!Object.values(Status).includes(value)) { + throw new Error('Invalid method config retry policy: retryableStatusCodes value not in status code range'); + } + } else if (typeof value === 'string') { + if (!Object.values(Status).includes(value.toUpperCase())) { + throw new Error('Invalid method config retry policy: retryableStatusCodes value not a status code name'); + } + } else { + throw new Error('Invalid method config retry policy: retryableStatusCodes value must be a string or number'); + } + } + return { + maxAttempts: obj.maxAttempts, + initialBackoff: obj.initialBackoff, + maxBackoff: obj.maxBackoff, + backoffMultiplier: obj.backoffMultiplier, + retryableStatusCodes: obj.retryableStatusCodes + }; +} + +function validateHedgingPolicy(obj: any): HedgingPolicy { + if (!('maxAttempts' in obj) || !Number.isInteger(obj.maxAttempts) || obj.maxAttempts < 2) { + throw new Error('Invalid method config hedging policy: maxAttempts must be an integer at least 2'); + } + if (('hedgingDelay' in obj) && (typeof obj.hedgingDelay !== 'string' || !DURATION_REGEX.test(obj.hedgingDelay))) { + throw new Error('Invalid method config hedging policy: hedgingDelay must be a string consisting of a positive integer followed by s'); + } + if (('nonFatalStatusCodes' in obj) && Array.isArray(obj.nonFatalStatusCodes)) { + for (const value of obj.nonFatalStatusCodes) { + if (typeof value === 'number') { + if (!Object.values(Status).includes(value)) { + throw new Error('Invlid method config hedging policy: nonFatalStatusCodes value not in status code range'); + } + } else if (typeof value === 'string') { + if (!Object.values(Status).includes(value.toUpperCase())) { + throw new Error('Invlid method config hedging policy: nonFatalStatusCodes value not a status code name'); + } + } else { + throw new Error('Invlid method config hedging policy: nonFatalStatusCodes value must be a string or number'); + } + } + } + const result: HedgingPolicy = { + maxAttempts: obj.maxAttempts + } + if (obj.hedgingDelay) { + result.hedgingDelay = obj.hedgingDelay; + } + if (obj.nonFatalStatusCodes) { + result.nonFatalStatusCodes = obj.nonFatalStatusCodes; + } + return result; +} + function validateMethodConfig(obj: any): MethodConfig { const result: MethodConfig = { name: [], @@ -144,7 +219,7 @@ function validateMethodConfig(obj: any): MethodConfig { result.timeout = obj.timeout; } else if ( typeof obj.timeout === 'string' && - TIMEOUT_REGEX.test(obj.timeout) + DURATION_REGEX.test(obj.timeout) ) { const timeoutParts = obj.timeout .substring(0, obj.timeout.length - 1) @@ -169,9 +244,31 @@ function validateMethodConfig(obj: any): MethodConfig { } result.maxResponseBytes = obj.maxResponseBytes; } + if ('retryPolicy' in obj) { + if ('hedgingPolicy' in obj) { + throw new Error('Invalid method config: retryPolicy and hedgingPolicy cannot both be specified'); + } else { + result.retryPolicy = validateRetryPolicy(obj.retryPolicy); + } + } else if ('hedgingPolicy' in obj) { + result.hedgingPolicy = validateHedgingPolicy(obj.hedgingPolicy); + } return result; } +export function validateRetryThrottling(obj: any): RetryThrottling { + if (!('maxTokens' in obj) || typeof obj.maxTokens !== 'number' || obj.maxTokens <=0 || obj.maxTokens > 1000) { + throw new Error('Invalid retryThrottling: maxTokens must be a number in (0, 1000]'); + } + if (!('tokenRatio' in obj) || typeof obj.tokenRatio !== 'number' || obj.tokenRatio <= 0) { + throw new Error('Invalid retryThrottling: tokenRatio must be a number greater than 0'); + } + return { + maxTokens: +(obj.maxTokens as number).toFixed(3), + tokenRatio: +(obj.tokenRatio as number).toFixed(3) + }; +} + export function validateServiceConfig(obj: any): ServiceConfig { const result: ServiceConfig = { loadBalancingConfig: [], @@ -200,6 +297,9 @@ export function validateServiceConfig(obj: any): ServiceConfig { } } } + if ('retryThrottling' in obj) { + result.retryThrottling = validateRetryThrottling(obj.retryThrottling); + } // Validate method name uniqueness const seenMethodNames: MethodConfigName[] = []; for (const methodConfig of result.methodConfig) { diff --git a/packages/grpc-js/src/subchannel-call.ts b/packages/grpc-js/src/subchannel-call.ts index e2cd645af..2bc6fb0c7 100644 --- a/packages/grpc-js/src/subchannel-call.ts +++ b/packages/grpc-js/src/subchannel-call.ts @@ -75,6 +75,14 @@ export interface SubchannelCall { getCallNumber(): number; } +export interface StatusObjectWithRstCode extends StatusObject { + rstCode?: number; +} + +export interface SubchannelCallInterceptingListener extends InterceptingListener { + onReceiveStatus(status: StatusObjectWithRstCode): void; +} + export class Http2SubchannelCall implements SubchannelCall { private decoder = new StreamDecoder(); @@ -103,7 +111,7 @@ export class Http2SubchannelCall implements SubchannelCall { constructor( private readonly http2Stream: http2.ClientHttp2Stream, private readonly callStatsTracker: SubchannelCallStatsTracker, - private readonly listener: InterceptingListener, + private readonly listener: SubchannelCallInterceptingListener, private readonly subchannel: Subchannel, private readonly callId: number ) { @@ -257,7 +265,7 @@ export class Http2SubchannelCall implements SubchannelCall { // This is OK, because status codes emitted here correspond to more // catastrophic issues that prevent us from receiving trailers in the // first place. - this.endCall({ code, details, metadata: new Metadata() }); + this.endCall({ code, details, metadata: new Metadata(), rstCode: http2Stream.rstCode }); }); }); http2Stream.on('error', (err: SystemError) => { @@ -329,7 +337,7 @@ export class Http2SubchannelCall implements SubchannelCall { * Subsequent calls are no-ops. * @param status The status of the call. */ - private endCall(status: StatusObject): void { + private endCall(status: StatusObjectWithRstCode): void { /* If the status is OK and a new status comes in (e.g. from a * deserialization failure), that new status takes priority */ if (this.finalStatus === null || this.finalStatus.code === Status.OK) { diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 104c36c24..0d9773b30 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -36,7 +36,7 @@ import { } from './subchannel-address'; import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz'; import { ConnectivityStateListener } from './subchannel-interface'; -import { Http2SubchannelCall } from './subchannel-call'; +import { Http2SubchannelCall, SubchannelCallInterceptingListener } from './subchannel-call'; import { getNextCallNumber } from './call-number'; import { SubchannelCall } from './subchannel-call'; import { InterceptingListener, StatusObject } from './call-interface'; @@ -815,7 +815,7 @@ export class Subchannel { return false; } - createCall(metadata: Metadata, host: string, method: string, listener: InterceptingListener): SubchannelCall { + createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener): SubchannelCall { const headers = metadata.toHttp2Headers(); headers[HTTP2_HEADER_AUTHORITY] = host; headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; diff --git a/packages/grpc-js/test/test-retry-config.ts b/packages/grpc-js/test/test-retry-config.ts new file mode 100644 index 000000000..e27f236e2 --- /dev/null +++ b/packages/grpc-js/test/test-retry-config.ts @@ -0,0 +1,292 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import assert = require("assert"); +import { validateServiceConfig } from "../src/service-config"; + +function createRetryServiceConfig(retryConfig: object): object { + return { + loadBalancingConfig: [], + methodConfig: [ + { + name: [{ + service: 'A', + method: 'B' + }], + + retryPolicy: retryConfig + } + ] + }; +} + +function createHedgingServiceConfig(hedgingConfig: object): object { + return { + loadBalancingConfig: [], + methodConfig: [ + { + name: [{ + service: 'A', + method: 'B' + }], + + hedgingPolicy: hedgingConfig + } + ] + }; +} + +function createThrottlingServiceConfig(retryThrottling: object): object { + return { + loadBalancingConfig: [], + methodConfig: [], + retryThrottling: retryThrottling + }; +} + +interface TestCase { + description: string; + config: object; + error: RegExp; +} + +const validRetryConfig = { + maxAttempts: 2, + initialBackoff: '1s', + maxBackoff: '1s', + backoffMultiplier: 1, + retryableStatusCodes: [14, 'RESOURCE_EXHAUSTED'] +}; + +const RETRY_TEST_CASES: TestCase[] = [ + { + description: 'omitted maxAttempts', + config: { + initialBackoff: '1s', + maxBackoff: '1s', + backoffMultiplier: 1, + retryableStatusCodes: [14] + }, + error: /retry policy: maxAttempts must be an integer at least 2/ + }, + { + description: 'a low maxAttempts', + config: {...validRetryConfig, maxAttempts: 1}, + error: /retry policy: maxAttempts must be an integer at least 2/ + }, + { + description: 'omitted initialBackoff', + config: { + maxAttempts: 2, + maxBackoff: '1s', + backoffMultiplier: 1, + retryableStatusCodes: [14] + }, + error: /retry policy: initialBackoff must be a string consisting of a positive integer followed by s/ + }, + { + description: 'a non-numeric initialBackoff', + config: {...validRetryConfig, initialBackoff: 'abcs'}, + error: /retry policy: initialBackoff must be a string consisting of a positive integer followed by s/ + }, + { + description: 'an initialBackoff without an s', + config: {...validRetryConfig, initialBackoff: '123'}, + error: /retry policy: initialBackoff must be a string consisting of a positive integer followed by s/ + }, + { + description: 'omitted maxBackoff', + config: { + maxAttempts: 2, + initialBackoff: '1s', + backoffMultiplier: 1, + retryableStatusCodes: [14] + }, + error: /retry policy: maxBackoff must be a string consisting of a positive integer followed by s/ + }, + { + description: 'a non-numeric maxBackoff', + config: {...validRetryConfig, maxBackoff: 'abcs'}, + error: /retry policy: maxBackoff must be a string consisting of a positive integer followed by s/ + }, + { + description: 'an maxBackoff without an s', + config: {...validRetryConfig, maxBackoff: '123'}, + error: /retry policy: maxBackoff must be a string consisting of a positive integer followed by s/ + }, + { + description: 'omitted backoffMultiplier', + config: { + maxAttempts: 2, + initialBackoff: '1s', + maxBackoff: '1s', + retryableStatusCodes: [14] + }, + error: /retry policy: backoffMultiplier must be a number greater than 0/ + }, + { + description: 'a negative backoffMultiplier', + config: {...validRetryConfig, backoffMultiplier: -1}, + error: /retry policy: backoffMultiplier must be a number greater than 0/ + }, + { + description: 'omitted retryableStatusCodes', + config: { + maxAttempts: 2, + initialBackoff: '1s', + maxBackoff: '1s', + backoffMultiplier: 1 + }, + error: /retry policy: retryableStatusCodes is required/ + }, + { + description: 'empty retryableStatusCodes', + config: {...validRetryConfig, retryableStatusCodes: []}, + error: /retry policy: retryableStatusCodes must be non-empty/ + }, + { + description: 'unknown status code name', + config: {...validRetryConfig, retryableStatusCodes: ['abcd']}, + error: /retry policy: retryableStatusCodes value not a status code name/ + }, + { + description: 'out of range status code number', + config: {...validRetryConfig, retryableStatusCodes: [12345]}, + error: /retry policy: retryableStatusCodes value not in status code range/ + } +]; + +const validHedgingConfig = { + maxAttempts: 2 +}; + +const HEDGING_TEST_CASES: TestCase[] = [ + { + description: 'omitted maxAttempts', + config: {}, + error: /hedging policy: maxAttempts must be an integer at least 2/ + }, + { + description: 'a low maxAttempts', + config: {...validHedgingConfig, maxAttempts: 1}, + error: /hedging policy: maxAttempts must be an integer at least 2/ + }, + { + description: 'a non-numeric hedgingDelay', + config: {...validHedgingConfig, hedgingDelay: 'abcs'}, + error: /hedging policy: hedgingDelay must be a string consisting of a positive integer followed by s/ + }, + { + description: 'a hedgingDelay without an s', + config: {...validHedgingConfig, hedgingDelay: '123'}, + error: /hedging policy: hedgingDelay must be a string consisting of a positive integer followed by s/ + }, + { + description: 'unknown status code name', + config: {...validHedgingConfig, nonFatalStatusCodes: ['abcd']}, + error: /hedging policy: nonFatalStatusCodes value not a status code name/ + }, + { + description: 'out of range status code number', + config: {...validHedgingConfig, nonFatalStatusCodes: [12345]}, + error: /hedging policy: nonFatalStatusCodes value not in status code range/ + } +]; + +const validThrottlingConfig = { + maxTokens: 100, + tokenRatio: 0.1 +}; + +const THROTTLING_TEST_CASES: TestCase[] = [ + { + description: 'omitted maxTokens', + config: {tokenRatio: 0.1}, + error: /retryThrottling: maxTokens must be a number in \(0, 1000\]/ + }, + { + description: 'a large maxTokens', + config: {...validThrottlingConfig, maxTokens: 1001}, + error: /retryThrottling: maxTokens must be a number in \(0, 1000\]/ + }, + { + description: 'zero maxTokens', + config: {...validThrottlingConfig, maxTokens: 0}, + error: /retryThrottling: maxTokens must be a number in \(0, 1000\]/ + }, + { + description: 'omitted tokenRatio', + config: {maxTokens: 100}, + error: /retryThrottling: tokenRatio must be a number greater than 0/ + }, + { + description: 'zero tokenRatio', + config: {...validThrottlingConfig, tokenRatio: 0}, + error: /retryThrottling: tokenRatio must be a number greater than 0/ + } +]; + +describe('Retry configs', () => { + describe('Retry', () => { + it('Should accept a valid config', () => { + assert.doesNotThrow(() => { + validateServiceConfig(createRetryServiceConfig(validRetryConfig)); + }); + }); + for (const testCase of RETRY_TEST_CASES) { + it(`Should reject ${testCase.description}`, () => { + assert.throws(() => { + validateServiceConfig(createRetryServiceConfig(testCase.config)); + }, testCase.error); + }); + } + }); + describe('Hedging', () => { + it('Should accept valid configs', () => { + assert.doesNotThrow(() => { + validateServiceConfig(createHedgingServiceConfig(validHedgingConfig)); + }); + assert.doesNotThrow(() => { + validateServiceConfig(createHedgingServiceConfig({...validHedgingConfig, hedgingDelay: '1s'})); + }); + assert.doesNotThrow(() => { + validateServiceConfig(createHedgingServiceConfig({...validHedgingConfig, nonFatalStatusCodes: [14, 'RESOURCE_EXHAUSTED']})); + }); + }); + for (const testCase of HEDGING_TEST_CASES) { + it(`Should reject ${testCase.description}`, () => { + assert.throws(() => { + validateServiceConfig(createHedgingServiceConfig(testCase.config)); + }, testCase.error); + }); + } + }); + describe('Throttling', () => { + it('Should accept a valid config', () => { + assert.doesNotThrow(() => { + validateServiceConfig(createThrottlingServiceConfig(validThrottlingConfig)); + }); + }); + for (const testCase of THROTTLING_TEST_CASES) { + it(`Should reject ${testCase.description}`, () => { + assert.throws(() => { + validateServiceConfig(createThrottlingServiceConfig(testCase.config)); + }, testCase.error); + }); + } + }); +}); \ No newline at end of file diff --git a/packages/grpc-js/test/test-retry.ts b/packages/grpc-js/test/test-retry.ts new file mode 100644 index 000000000..66c0f7941 --- /dev/null +++ b/packages/grpc-js/test/test-retry.ts @@ -0,0 +1,364 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as assert from 'assert'; +import * as path from 'path'; +import * as grpc from '../src'; +import { loadProtoFile } from './common'; + +const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); +const EchoService = loadProtoFile(protoFile) + .EchoService as grpc.ServiceClientConstructor; + +const serviceImpl = { + echo: (call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => { + const succeedOnRetryAttempt = call.metadata.get('succeed-on-retry-attempt'); + const previousAttempts = call.metadata.get('grpc-previous-rpc-attempts'); + if (succeedOnRetryAttempt.length === 0 || (previousAttempts.length > 0 && previousAttempts[0] === succeedOnRetryAttempt[0])) { + callback(null, call.request); + } else { + const statusCode = call.metadata.get('respond-with-status'); + const code = statusCode[0] ? Number.parseInt(statusCode[0] as string) : grpc.status.UNKNOWN; + callback({ + code: code, + details: `Failed on retry ${previousAttempts[0] ?? 0}` + }); + } + } +} + +describe('Retries', () => { + let server: grpc.Server; + let port: number; + before((done) => { + server = new grpc.Server(); + server.addService(EchoService.service, serviceImpl); + server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, portNumber) => { + if (error) { + done(error); + return; + } + port = portNumber; + server.start(); + done(); + }); + }); + + after(() => { + server.forceShutdown(); + }); + + describe('Client with retries disabled', () => { + let client: InstanceType; + before(() => { + client = new EchoService(`localhost:${port}`, grpc.credentials.createInsecure(), {'grpc.enable_retries': 0}); + }); + + after(() =>{ + client.close(); + }); + + it('Should be able to make a basic request', (done) => { + client.echo( + { value: 'test value', value2: 3 }, + (error: grpc.ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, { value: 'test value', value2: 3 }); + done(); + } + ); + }); + + it('Should fail if the server fails the first request', (done) =>{ + const metadata = new grpc.Metadata(); + metadata.set('succeed-on-retry-attempt', '1'); + client.echo( + { value: 'test value', value2: 3 }, + metadata, + (error: grpc.ServiceError, response: any) => { + assert(error); + assert.strictEqual(error.details, 'Failed on retry 0'); + done(); + } + ); + }); + }); + + describe('Client with retries enabled but not configured', () => { + let client: InstanceType; + before(() => { + client = new EchoService(`localhost:${port}`, grpc.credentials.createInsecure()); + }); + + after(() =>{ + client.close(); + }); + + it('Should be able to make a basic request', (done) => { + client.echo( + { value: 'test value', value2: 3 }, + (error: grpc.ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, { value: 'test value', value2: 3 }); + done(); + } + ); + }); + + it('Should fail if the server fails the first request', (done) =>{ + const metadata = new grpc.Metadata(); + metadata.set('succeed-on-retry-attempt', '1'); + client.echo( + { value: 'test value', value2: 3 }, + metadata, + (error: grpc.ServiceError, response: any) => { + assert(error); + assert.strictEqual(error.details, 'Failed on retry 0'); + done(); + } + ); + }); + }); + + describe('Client with retries configured', () => { + let client: InstanceType; + before(() => { + const serviceConfig = { + loadBalancingConfig: [], + methodConfig: [ + { + name: [{ + service: 'EchoService' + }], + retryPolicy: { + maxAttempts: 3, + initialBackoff: '0.1s', + maxBackoff: '10s', + backoffMultiplier: 1.2, + retryableStatusCodes: [14, 'RESOURCE_EXHAUSTED'] + } + } + ] + } + client = new EchoService(`localhost:${port}`, grpc.credentials.createInsecure(), {'grpc.service_config': JSON.stringify(serviceConfig)}); + }); + + after(() =>{ + client.close(); + }); + + it('Should be able to make a basic request', (done) => { + client.echo( + { value: 'test value', value2: 3 }, + (error: grpc.ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, { value: 'test value', value2: 3 }); + done(); + } + ); + }); + + it('Should succeed with few required attempts', (done) => { + const metadata = new grpc.Metadata(); + metadata.set('succeed-on-retry-attempt', '2'); + metadata.set('respond-with-status', `${grpc.status.RESOURCE_EXHAUSTED}`); + client.echo( + { value: 'test value', value2: 3 }, + metadata, + (error: grpc.ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, { value: 'test value', value2: 3 }); + done(); + } + ); + }); + + it('Should fail with many required attempts', (done) => { + const metadata = new grpc.Metadata(); + metadata.set('succeed-on-retry-attempt', '4'); + metadata.set('respond-with-status', `${grpc.status.RESOURCE_EXHAUSTED}`); + client.echo( + { value: 'test value', value2: 3 }, + metadata, + (error: grpc.ServiceError, response: any) => { + assert(error); + assert.strictEqual(error.details, 'Failed on retry 2'); + done(); + } + ); + }); + + it('Should fail with a fatal status code', (done) => { + const metadata = new grpc.Metadata(); + metadata.set('succeed-on-retry-attempt', '2'); + metadata.set('respond-with-status', `${grpc.status.NOT_FOUND}`); + client.echo( + { value: 'test value', value2: 3 }, + metadata, + (error: grpc.ServiceError, response: any) => { + assert(error); + assert.strictEqual(error.details, 'Failed on retry 0'); + done(); + } + ); + }); + + it('Should not be able to make more than 5 attempts', (done) => { + const serviceConfig = { + loadBalancingConfig: [], + methodConfig: [ + { + name: [{ + service: 'EchoService' + }], + retryPolicy: { + maxAttempts: 10, + initialBackoff: '0.1s', + maxBackoff: '10s', + backoffMultiplier: 1.2, + retryableStatusCodes: [14, 'RESOURCE_EXHAUSTED'] + } + } + ] + } + const client2 = new EchoService(`localhost:${port}`, grpc.credentials.createInsecure(), {'grpc.service_config': JSON.stringify(serviceConfig)}); + const metadata = new grpc.Metadata(); + metadata.set('succeed-on-retry-attempt', '6'); + metadata.set('respond-with-status', `${grpc.status.RESOURCE_EXHAUSTED}`); + client2.echo( + { value: 'test value', value2: 3 }, + metadata, + (error: grpc.ServiceError, response: any) => { + assert(error); + assert.strictEqual(error.details, 'Failed on retry 4'); + done(); + } + ); + }) + }); + + describe('Client with hedging configured', () => { + let client: InstanceType; + before(() => { + const serviceConfig = { + loadBalancingConfig: [], + methodConfig: [ + { + name: [{ + service: 'EchoService' + }], + hedgingPolicy: { + maxAttempts: 3, + nonFatalStatusCodes: [14, 'RESOURCE_EXHAUSTED'] + } + } + ] + } + client = new EchoService(`localhost:${port}`, grpc.credentials.createInsecure(), {'grpc.service_config': JSON.stringify(serviceConfig)}); + }); + + after(() =>{ + client.close(); + }); + + it('Should be able to make a basic request', (done) => { + client.echo( + { value: 'test value', value2: 3 }, + (error: grpc.ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, { value: 'test value', value2: 3 }); + done(); + } + ); + }); + + it('Should succeed with few required attempts', (done) => { + const metadata = new grpc.Metadata(); + metadata.set('succeed-on-retry-attempt', '2'); + metadata.set('respond-with-status', `${grpc.status.RESOURCE_EXHAUSTED}`); + client.echo( + { value: 'test value', value2: 3 }, + metadata, + (error: grpc.ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, { value: 'test value', value2: 3 }); + done(); + } + ); + }); + + it('Should fail with many required attempts', (done) => { + const metadata = new grpc.Metadata(); + metadata.set('succeed-on-retry-attempt', '4'); + metadata.set('respond-with-status', `${grpc.status.RESOURCE_EXHAUSTED}`); + client.echo( + { value: 'test value', value2: 3 }, + metadata, + (error: grpc.ServiceError, response: any) => { + assert(error); + assert(error.details.startsWith('Failed on retry')); + done(); + } + ); + }); + + it('Should fail with a fatal status code', (done) => { + const metadata = new grpc.Metadata(); + metadata.set('succeed-on-retry-attempt', '2'); + metadata.set('respond-with-status', `${grpc.status.NOT_FOUND}`); + client.echo( + { value: 'test value', value2: 3 }, + metadata, + (error: grpc.ServiceError, response: any) => { + assert(error); + assert(error.details.startsWith('Failed on retry')); + done(); + } + ); + }); + + it('Should not be able to make more than 5 attempts', (done) => { + const serviceConfig = { + loadBalancingConfig: [], + methodConfig: [ + { + name: [{ + service: 'EchoService' + }], + hedgingPolicy: { + maxAttempts: 10, + nonFatalStatusCodes: [14, 'RESOURCE_EXHAUSTED'] + } + } + ] + } + const client2 = new EchoService(`localhost:${port}`, grpc.credentials.createInsecure(), {'grpc.service_config': JSON.stringify(serviceConfig)}); + const metadata = new grpc.Metadata(); + metadata.set('succeed-on-retry-attempt', '6'); + metadata.set('respond-with-status', `${grpc.status.RESOURCE_EXHAUSTED}`); + client2.echo( + { value: 'test value', value2: 3 }, + metadata, + (error: grpc.ServiceError, response: any) => { + assert(error); + assert(error.details.startsWith('Failed on retry')); + done(); + } + ); + }) + }); +}); \ No newline at end of file