From 339eb37efd5fc6e7da177036913a0840c81c1195 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 26 Apr 2022 10:03:08 -0700 Subject: [PATCH 1/4] grpc-js: Refactor in preparation for retries --- .../grpc-js/src/call-credentials-filter.ts | 86 -- packages/grpc-js/src/call-interface.ts | 169 ++++ packages/grpc-js/src/call-number.ts | 22 + packages/grpc-js/src/call-stream.ts | 882 ------------------ packages/grpc-js/src/call.ts | 2 +- packages/grpc-js/src/channel.ts | 653 +------------ packages/grpc-js/src/client-interceptors.ts | 2 +- packages/grpc-js/src/client.ts | 3 +- packages/grpc-js/src/compression-filter.ts | 4 +- packages/grpc-js/src/deadline-filter.ts | 120 --- packages/grpc-js/src/deadline.ts | 58 ++ packages/grpc-js/src/experimental.ts | 2 +- packages/grpc-js/src/filter-stack.ts | 14 +- packages/grpc-js/src/filter.ts | 12 +- packages/grpc-js/src/index.ts | 5 +- packages/grpc-js/src/internal-channel.ts | 504 ++++++++++ .../src/load-balancer-outlier-detection.ts | 34 +- .../grpc-js/src/load-balancer-pick-first.ts | 2 +- .../grpc-js/src/load-balancer-round-robin.ts | 2 +- packages/grpc-js/src/load-balancing-call.ts | 281 ++++++ .../grpc-js/src/max-message-size-filter.ts | 30 +- packages/grpc-js/src/picker.ts | 23 +- packages/grpc-js/src/resolver-dns.ts | 2 +- packages/grpc-js/src/resolver-ip.ts | 2 +- packages/grpc-js/src/resolver.ts | 2 +- packages/grpc-js/src/resolving-call.ts | 219 +++++ .../grpc-js/src/resolving-load-balancer.ts | 2 +- packages/grpc-js/src/server-call.ts | 5 +- packages/grpc-js/src/service-config.ts | 23 + packages/grpc-js/src/status-builder.ts | 2 +- packages/grpc-js/src/subchannel-call.ts | 504 ++++++++++ packages/grpc-js/src/subchannel.ts | 91 +- packages/grpc-js/test/test-resolver.ts | 2 +- 33 files changed, 1886 insertions(+), 1878 deletions(-) delete mode 100644 packages/grpc-js/src/call-credentials-filter.ts create mode 100644 packages/grpc-js/src/call-interface.ts create mode 100644 packages/grpc-js/src/call-number.ts delete mode 100644 packages/grpc-js/src/call-stream.ts delete mode 100644 packages/grpc-js/src/deadline-filter.ts create mode 100644 packages/grpc-js/src/deadline.ts create mode 100644 packages/grpc-js/src/internal-channel.ts create mode 100644 packages/grpc-js/src/load-balancing-call.ts create mode 100644 packages/grpc-js/src/resolving-call.ts create mode 100644 packages/grpc-js/src/subchannel-call.ts diff --git a/packages/grpc-js/src/call-credentials-filter.ts b/packages/grpc-js/src/call-credentials-filter.ts deleted file mode 100644 index 5c746297e..000000000 --- a/packages/grpc-js/src/call-credentials-filter.ts +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { Call } from './call-stream'; -import { Channel } from './channel'; -import { BaseFilter, Filter, FilterFactory } from './filter'; -import { Metadata } from './metadata'; -import { Status } from './constants'; -import { splitHostPort } from './uri-parser'; -import { ServiceError } from './call'; - -export class CallCredentialsFilter extends BaseFilter implements Filter { - private serviceUrl: string; - constructor( - private readonly channel: Channel, - private readonly stream: Call - ) { - super(); - this.channel = channel; - this.stream = stream; - const splitPath: string[] = stream.getMethod().split('/'); - let serviceName = ''; - /* The standard path format is "/{serviceName}/{methodName}", so if we split - * by '/', the first item should be empty and the second should be the - * service name */ - if (splitPath.length >= 2) { - serviceName = splitPath[1]; - } - const hostname = splitHostPort(stream.getHost())?.host ?? 'localhost'; - /* Currently, call credentials are only allowed on HTTPS connections, so we - * can assume that the scheme is "https" */ - this.serviceUrl = `https://${hostname}/${serviceName}`; - } - - async sendMetadata(metadata: Promise): Promise { - const credentials = this.stream.getCredentials(); - const credsMetadata = credentials.generateMetadata({ - service_url: this.serviceUrl, - }); - const resultMetadata = await metadata; - try { - resultMetadata.merge(await credsMetadata); - } catch (error) { - this.stream.cancelWithStatus( - Status.UNAUTHENTICATED, - `Failed to retrieve auth metadata with error: ${error.message}` - ); - return Promise.reject('Failed to retrieve auth metadata'); - } - if (resultMetadata.get('authorization').length > 1) { - this.stream.cancelWithStatus( - Status.INTERNAL, - '"authorization" metadata cannot have multiple values' - ); - return Promise.reject( - '"authorization" metadata cannot have multiple values' - ); - } - return resultMetadata; - } -} - -export class CallCredentialsFilterFactory - implements FilterFactory { - constructor(private readonly channel: Channel) { - this.channel = channel; - } - - createFilter(callStream: Call): CallCredentialsFilter { - return new CallCredentialsFilter(this.channel, callStream); - } -} diff --git a/packages/grpc-js/src/call-interface.ts b/packages/grpc-js/src/call-interface.ts new file mode 100644 index 000000000..891170fec --- /dev/null +++ b/packages/grpc-js/src/call-interface.ts @@ -0,0 +1,169 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { CallCredentials } from "./call-credentials"; +import { Status } from "./constants"; +import { Deadline } from "./deadline"; +import { Metadata } from "./metadata"; +import { ServerSurfaceCall } from "./server-call"; + +export interface CallStreamOptions { + deadline: Deadline; + flags: number; + host: string; + parentCall: ServerSurfaceCall | null; +} + +export type PartialCallStreamOptions = Partial; + +export interface StatusObject { + code: Status; + details: string; + metadata: Metadata; +} + +export const enum WriteFlags { + BufferHint = 1, + NoCompress = 2, + WriteThrough = 4, +} + +export interface WriteObject { + message: Buffer; + flags?: number; +} + +export interface MetadataListener { + (metadata: Metadata, next: (metadata: Metadata) => void): void; +} + +export interface MessageListener { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (message: any, next: (message: any) => void): void; +} + +export interface StatusListener { + (status: StatusObject, next: (status: StatusObject) => void): void; +} + +export interface FullListener { + onReceiveMetadata: MetadataListener; + onReceiveMessage: MessageListener; + onReceiveStatus: StatusListener; +} + +export type Listener = Partial; + +/** + * An object with methods for handling the responses to a call. + */ +export interface InterceptingListener { + onReceiveMetadata(metadata: Metadata): void; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + onReceiveMessage(message: any): void; + onReceiveStatus(status: StatusObject): void; +} + +export function isInterceptingListener( + listener: Listener | InterceptingListener +): listener is InterceptingListener { + return ( + listener.onReceiveMetadata !== undefined && + listener.onReceiveMetadata.length === 1 + ); +} + +export class InterceptingListenerImpl implements InterceptingListener { + private processingMetadata = false; + private hasPendingMessage = false; + private pendingMessage: any; + private processingMessage = false; + private pendingStatus: StatusObject | null = null; + constructor( + private listener: FullListener, + private nextListener: InterceptingListener + ) {} + + private processPendingMessage() { + if (this.hasPendingMessage) { + this.nextListener.onReceiveMessage(this.pendingMessage); + this.pendingMessage = null; + this.hasPendingMessage = false; + } + } + + private processPendingStatus() { + if (this.pendingStatus) { + this.nextListener.onReceiveStatus(this.pendingStatus); + } + } + + onReceiveMetadata(metadata: Metadata): void { + this.processingMetadata = true; + this.listener.onReceiveMetadata(metadata, (metadata) => { + this.processingMetadata = false; + this.nextListener.onReceiveMetadata(metadata); + this.processPendingMessage(); + this.processPendingStatus(); + }); + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + onReceiveMessage(message: any): void { + /* If this listener processes messages asynchronously, the last message may + * be reordered with respect to the status */ + this.processingMessage = true; + this.listener.onReceiveMessage(message, (msg) => { + this.processingMessage = false; + if (this.processingMetadata) { + this.pendingMessage = msg; + this.hasPendingMessage = true; + } else { + this.nextListener.onReceiveMessage(msg); + this.processPendingStatus(); + } + }); + } + onReceiveStatus(status: StatusObject): void { + this.listener.onReceiveStatus(status, (processedStatus) => { + if (this.processingMetadata || this.processingMessage) { + this.pendingStatus = processedStatus; + } else { + this.nextListener.onReceiveStatus(processedStatus); + } + }); + } +} + +export interface WriteCallback { + (error?: Error | null): void; +} + +export interface MessageContext { + callback?: WriteCallback; + flags?: number; +} + +export interface Call { + cancelWithStatus(status: Status, details: string): void; + getPeer(): string; + start(metadata: Metadata, listener: InterceptingListener): void; + sendMessageWithContext(context: MessageContext, message: Buffer): void; + startRead(): void; + halfClose(): void; + getCallNumber(): number; + setCredentials(credentials: CallCredentials): void; +} \ No newline at end of file diff --git a/packages/grpc-js/src/call-number.ts b/packages/grpc-js/src/call-number.ts new file mode 100644 index 000000000..48d34fac5 --- /dev/null +++ b/packages/grpc-js/src/call-number.ts @@ -0,0 +1,22 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +let nextCallNumber = 0; + +export function getNextCallNumber() { + return nextCallNumber++; +} \ No newline at end of file diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts deleted file mode 100644 index e8f312752..000000000 --- a/packages/grpc-js/src/call-stream.ts +++ /dev/null @@ -1,882 +0,0 @@ -/* - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import * as http2 from 'http2'; -import * as os from 'os'; - -import { CallCredentials } from './call-credentials'; -import { Propagate, Status } from './constants'; -import { Filter, FilterFactory } from './filter'; -import { FilterStackFactory, FilterStack } from './filter-stack'; -import { Metadata } from './metadata'; -import { StreamDecoder } from './stream-decoder'; -import { ChannelImplementation } from './channel'; -import { SubchannelCallStatsTracker, Subchannel } from './subchannel'; -import * as logging from './logging'; -import { LogVerbosity } from './constants'; -import { ServerSurfaceCall } from './server-call'; - -const TRACER_NAME = 'call_stream'; - -const { - HTTP2_HEADER_STATUS, - HTTP2_HEADER_CONTENT_TYPE, - NGHTTP2_CANCEL, -} = http2.constants; - -/** - * https://nodejs.org/api/errors.html#errors_class_systemerror - */ -interface SystemError extends Error { - address?: string; - code: string; - dest?: string; - errno: number; - info?: object; - message: string; - path?: string; - port?: number; - syscall: string; -} - -/** - * Should do approximately the same thing as util.getSystemErrorName but the - * TypeScript types don't have that function for some reason so I just made my - * own. - * @param errno - */ -function getSystemErrorName(errno: number): string { - for (const [name, num] of Object.entries(os.constants.errno)) { - if (num === errno) { - return name; - } - } - return 'Unknown system error ' + errno; -} - -export type Deadline = Date | number; - -function getMinDeadline(deadlineList: Deadline[]): Deadline { - let minValue = Infinity; - for (const deadline of deadlineList) { - const deadlineMsecs = - deadline instanceof Date ? deadline.getTime() : deadline; - if (deadlineMsecs < minValue) { - minValue = deadlineMsecs; - } - } - return minValue; -} - -export interface CallStreamOptions { - deadline: Deadline; - flags: number; - host: string; - parentCall: ServerSurfaceCall | null; -} - -export type PartialCallStreamOptions = Partial; - -export interface StatusObject { - code: Status; - details: string; - metadata: Metadata; -} - -export const enum WriteFlags { - BufferHint = 1, - NoCompress = 2, - WriteThrough = 4, -} - -export interface WriteObject { - message: Buffer; - flags?: number; -} - -export interface MetadataListener { - (metadata: Metadata, next: (metadata: Metadata) => void): void; -} - -export interface MessageListener { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (message: any, next: (message: any) => void): void; -} - -export interface StatusListener { - (status: StatusObject, next: (status: StatusObject) => void): void; -} - -export interface FullListener { - onReceiveMetadata: MetadataListener; - onReceiveMessage: MessageListener; - onReceiveStatus: StatusListener; -} - -export type Listener = Partial; - -/** - * An object with methods for handling the responses to a call. - */ -export interface InterceptingListener { - onReceiveMetadata(metadata: Metadata): void; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - onReceiveMessage(message: any): void; - onReceiveStatus(status: StatusObject): void; -} - -export function isInterceptingListener( - listener: Listener | InterceptingListener -): listener is InterceptingListener { - return ( - listener.onReceiveMetadata !== undefined && - listener.onReceiveMetadata.length === 1 - ); -} - -export class InterceptingListenerImpl implements InterceptingListener { - private processingMetadata = false; - private hasPendingMessage = false; - private pendingMessage: any; - private processingMessage = false; - private pendingStatus: StatusObject | null = null; - constructor( - private listener: FullListener, - private nextListener: InterceptingListener - ) {} - - private processPendingMessage() { - if (this.hasPendingMessage) { - this.nextListener.onReceiveMessage(this.pendingMessage); - this.pendingMessage = null; - this.hasPendingMessage = false; - } - } - - private processPendingStatus() { - if (this.pendingStatus) { - this.nextListener.onReceiveStatus(this.pendingStatus); - } - } - - onReceiveMetadata(metadata: Metadata): void { - this.processingMetadata = true; - this.listener.onReceiveMetadata(metadata, (metadata) => { - this.processingMetadata = false; - this.nextListener.onReceiveMetadata(metadata); - this.processPendingMessage(); - this.processPendingStatus(); - }); - } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - onReceiveMessage(message: any): void { - /* If this listener processes messages asynchronously, the last message may - * be reordered with respect to the status */ - this.processingMessage = true; - this.listener.onReceiveMessage(message, (msg) => { - this.processingMessage = false; - if (this.processingMetadata) { - this.pendingMessage = msg; - this.hasPendingMessage = true; - } else { - this.nextListener.onReceiveMessage(msg); - this.processPendingStatus(); - } - }); - } - onReceiveStatus(status: StatusObject): void { - this.listener.onReceiveStatus(status, (processedStatus) => { - if (this.processingMetadata || this.processingMessage) { - this.pendingStatus = processedStatus; - } else { - this.nextListener.onReceiveStatus(processedStatus); - } - }); - } -} - -export interface WriteCallback { - (error?: Error | null): void; -} - -export interface MessageContext { - callback?: WriteCallback; - flags?: number; -} - -export interface Call { - cancelWithStatus(status: Status, details: string): void; - getPeer(): string; - start(metadata: Metadata, listener: InterceptingListener): void; - sendMessageWithContext(context: MessageContext, message: Buffer): void; - startRead(): void; - halfClose(): void; - - getDeadline(): Deadline; - getCredentials(): CallCredentials; - setCredentials(credentials: CallCredentials): void; - getMethod(): string; - getHost(): string; -} - -export class Http2CallStream implements Call { - credentials: CallCredentials; - filterStack: FilterStack; - private http2Stream: http2.ClientHttp2Stream | null = null; - private pendingRead = false; - private isWriteFilterPending = false; - private pendingWrite: Buffer | null = null; - private pendingWriteCallback: WriteCallback | null = null; - private writesClosed = false; - - private decoder = new StreamDecoder(); - - private isReadFilterPending = false; - private canPush = false; - /** - * Indicates that an 'end' event has come from the http2 stream, so there - * will be no more data events. - */ - private readsClosed = false; - - private statusOutput = false; - - private unpushedReadMessages: Buffer[] = []; - private unfilteredReadMessages: Buffer[] = []; - - // Status code mapped from :status. To be used if grpc-status is not received - private mappedStatusCode: Status = Status.UNKNOWN; - - // This is populated (non-null) if and only if the call has ended - private finalStatus: StatusObject | null = null; - - private subchannel: Subchannel | null = null; - private disconnectListener: () => void; - - private listener: InterceptingListener | null = null; - - private internalError: SystemError | null = null; - - private configDeadline: Deadline = Infinity; - - private statusWatchers: ((status: StatusObject) => void)[] = []; - private streamEndWatchers: ((success: boolean) => void)[] = []; - - private callStatsTracker: SubchannelCallStatsTracker | null = null; - - constructor( - private readonly methodName: string, - private readonly channel: ChannelImplementation, - private readonly options: CallStreamOptions, - filterStackFactory: FilterStackFactory, - private readonly channelCallCredentials: CallCredentials, - private readonly callNumber: number - ) { - this.filterStack = filterStackFactory.createFilter(this); - this.credentials = channelCallCredentials; - this.disconnectListener = () => { - this.endCall({ - code: Status.UNAVAILABLE, - details: 'Connection dropped', - metadata: new Metadata(), - }); - }; - if ( - this.options.parentCall && - this.options.flags & Propagate.CANCELLATION - ) { - this.options.parentCall.on('cancelled', () => { - this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call'); - }); - } - } - - private outputStatus() { - /* Precondition: this.finalStatus !== null */ - if (this.listener && !this.statusOutput) { - this.statusOutput = true; - const filteredStatus = this.filterStack.receiveTrailers( - this.finalStatus! - ); - this.trace( - 'ended with status: code=' + - filteredStatus.code + - ' details="' + - filteredStatus.details + - '"' - ); - this.statusWatchers.forEach(watcher => watcher(filteredStatus)); - /* We delay the actual action of bubbling up the status to insulate the - * cleanup code in this class from any errors that may be thrown in the - * upper layers as a result of bubbling up the status. In particular, - * if the status is not OK, the "error" event may be emitted - * synchronously at the top level, which will result in a thrown error if - * the user does not handle that event. */ - process.nextTick(() => { - this.listener?.onReceiveStatus(filteredStatus); - }); - if (this.subchannel) { - this.subchannel.callUnref(); - this.subchannel.removeDisconnectListener(this.disconnectListener); - } - } - } - - private trace(text: string): void { - logging.trace( - LogVerbosity.DEBUG, - TRACER_NAME, - '[' + this.callNumber + '] ' + text - ); - } - - /** - * On first call, emits a 'status' event with the given StatusObject. - * Subsequent calls are no-ops. - * @param status The status of the call. - */ - private endCall(status: StatusObject): void { - /* If the status is OK and a new status comes in (e.g. from a - * deserialization failure), that new status takes priority */ - if (this.finalStatus === null || this.finalStatus.code === Status.OK) { - this.finalStatus = status; - this.maybeOutputStatus(); - } - this.destroyHttp2Stream(); - } - - private maybeOutputStatus() { - if (this.finalStatus !== null) { - /* The combination check of readsClosed and that the two message buffer - * arrays are empty checks that there all incoming data has been fully - * processed */ - if ( - this.finalStatus.code !== Status.OK || - (this.readsClosed && - this.unpushedReadMessages.length === 0 && - this.unfilteredReadMessages.length === 0 && - !this.isReadFilterPending) - ) { - this.outputStatus(); - } - } - } - - private push(message: Buffer): void { - this.trace( - 'pushing to reader message of length ' + - (message instanceof Buffer ? message.length : null) - ); - this.canPush = false; - process.nextTick(() => { - /* If we have already output the status any later messages should be - * ignored, and can cause out-of-order operation errors higher up in the - * stack. Checking as late as possible here to avoid any race conditions. - */ - if (this.statusOutput) { - return; - } - this.listener?.onReceiveMessage(message); - this.maybeOutputStatus(); - }); - } - - private handleFilterError(error: Error) { - this.cancelWithStatus(Status.INTERNAL, error.message); - } - - private handleFilteredRead(message: Buffer) { - /* If we the call has already ended with an error, we don't want to do - * anything with this message. Dropping it on the floor is correct - * behavior */ - if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) { - this.maybeOutputStatus(); - return; - } - this.isReadFilterPending = false; - if (this.canPush) { - this.http2Stream!.pause(); - this.push(message); - } else { - this.trace( - 'unpushedReadMessages.push message of length ' + message.length - ); - this.unpushedReadMessages.push(message); - } - if (this.unfilteredReadMessages.length > 0) { - /* nextMessage is guaranteed not to be undefined because - unfilteredReadMessages is non-empty */ - const nextMessage = this.unfilteredReadMessages.shift()!; - this.filterReceivedMessage(nextMessage); - } - } - - private filterReceivedMessage(framedMessage: Buffer) { - /* If we the call has already ended with an error, we don't want to do - * anything with this message. Dropping it on the floor is correct - * behavior */ - if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) { - this.maybeOutputStatus(); - return; - } - this.trace('filterReceivedMessage of length ' + framedMessage.length); - this.isReadFilterPending = true; - this.filterStack - .receiveMessage(Promise.resolve(framedMessage)) - .then( - this.handleFilteredRead.bind(this), - this.handleFilterError.bind(this) - ); - } - - private tryPush(messageBytes: Buffer): void { - if (this.isReadFilterPending) { - this.trace( - 'unfilteredReadMessages.push message of length ' + - (messageBytes && messageBytes.length) - ); - this.unfilteredReadMessages.push(messageBytes); - } else { - this.filterReceivedMessage(messageBytes); - } - } - - private handleTrailers(headers: http2.IncomingHttpHeaders) { - this.streamEndWatchers.forEach(watcher => watcher(true)); - let headersString = ''; - for (const header of Object.keys(headers)) { - headersString += '\t\t' + header + ': ' + headers[header] + '\n'; - } - this.trace('Received server trailers:\n' + headersString); - let metadata: Metadata; - try { - metadata = Metadata.fromHttp2Headers(headers); - } catch (e) { - metadata = new Metadata(); - } - const metadataMap = metadata.getMap(); - let code: Status = this.mappedStatusCode; - if ( - code === Status.UNKNOWN && - typeof metadataMap['grpc-status'] === 'string' - ) { - const receivedStatus = Number(metadataMap['grpc-status']); - if (receivedStatus in Status) { - code = receivedStatus; - this.trace('received status code ' + receivedStatus + ' from server'); - } - metadata.remove('grpc-status'); - } - let details = ''; - if (typeof metadataMap['grpc-message'] === 'string') { - details = decodeURI(metadataMap['grpc-message']); - metadata.remove('grpc-message'); - this.trace( - 'received status details string "' + details + '" from server' - ); - } - const status: StatusObject = { code, details, metadata }; - // This is a no-op if the call was already ended when handling headers. - this.endCall(status); - } - - private writeMessageToStream(message: Buffer, callback: WriteCallback) { - this.callStatsTracker?.addMessageSent(); - this.http2Stream!.write(message, callback); - } - - attachHttp2Stream( - stream: http2.ClientHttp2Stream, - subchannel: Subchannel, - extraFilters: Filter[], - callStatsTracker: SubchannelCallStatsTracker - ): void { - this.filterStack.push(extraFilters); - if (this.finalStatus !== null) { - stream.close(NGHTTP2_CANCEL); - } else { - this.trace( - 'attachHttp2Stream from subchannel ' + subchannel.getAddress() - ); - this.http2Stream = stream; - this.subchannel = subchannel; - this.callStatsTracker = callStatsTracker; - subchannel.addDisconnectListener(this.disconnectListener); - subchannel.callRef(); - stream.on('response', (headers, flags) => { - let headersString = ''; - for (const header of Object.keys(headers)) { - headersString += '\t\t' + header + ': ' + headers[header] + '\n'; - } - this.trace('Received server headers:\n' + headersString); - switch (headers[':status']) { - // TODO(murgatroid99): handle 100 and 101 - case 400: - this.mappedStatusCode = Status.INTERNAL; - break; - case 401: - this.mappedStatusCode = Status.UNAUTHENTICATED; - break; - case 403: - this.mappedStatusCode = Status.PERMISSION_DENIED; - break; - case 404: - this.mappedStatusCode = Status.UNIMPLEMENTED; - break; - case 429: - case 502: - case 503: - case 504: - this.mappedStatusCode = Status.UNAVAILABLE; - break; - default: - this.mappedStatusCode = Status.UNKNOWN; - } - - if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) { - this.handleTrailers(headers); - } else { - let metadata: Metadata; - try { - metadata = Metadata.fromHttp2Headers(headers); - } catch (error) { - this.endCall({ - code: Status.UNKNOWN, - details: error.message, - metadata: new Metadata(), - }); - return; - } - try { - const finalMetadata = this.filterStack.receiveMetadata(metadata); - this.listener?.onReceiveMetadata(finalMetadata); - } catch (error) { - this.endCall({ - code: Status.UNKNOWN, - details: error.message, - metadata: new Metadata(), - }); - } - } - }); - stream.on('trailers', this.handleTrailers.bind(this)); - stream.on('data', (data: Buffer) => { - this.trace('receive HTTP/2 data frame of length ' + data.length); - const messages = this.decoder.write(data); - - for (const message of messages) { - this.trace('parsed message of length ' + message.length); - this.callStatsTracker!.addMessageReceived(); - this.tryPush(message); - } - }); - stream.on('end', () => { - this.readsClosed = true; - this.maybeOutputStatus(); - }); - stream.on('close', () => { - /* Use process.next tick to ensure that this code happens after any - * "error" event that may be emitted at about the same time, so that - * we can bubble up the error message from that event. */ - process.nextTick(() => { - this.trace('HTTP/2 stream closed with code ' + stream.rstCode); - /* If we have a final status with an OK status code, that means that - * we have received all of the messages and we have processed the - * trailers and the call completed successfully, so it doesn't matter - * how the stream ends after that */ - if (this.finalStatus?.code === Status.OK) { - return; - } - let code: Status; - let details = ''; - switch (stream.rstCode) { - case http2.constants.NGHTTP2_NO_ERROR: - /* If we get a NO_ERROR code and we already have a status, the - * stream completed properly and we just haven't fully processed - * it yet */ - if (this.finalStatus !== null) { - return; - } - code = Status.INTERNAL; - details = `Received RST_STREAM with code ${stream.rstCode}`; - break; - case http2.constants.NGHTTP2_REFUSED_STREAM: - code = Status.UNAVAILABLE; - details = 'Stream refused by server'; - break; - case http2.constants.NGHTTP2_CANCEL: - code = Status.CANCELLED; - details = 'Call cancelled'; - break; - case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM: - code = Status.RESOURCE_EXHAUSTED; - details = 'Bandwidth exhausted or memory limit exceeded'; - break; - case http2.constants.NGHTTP2_INADEQUATE_SECURITY: - code = Status.PERMISSION_DENIED; - details = 'Protocol not secure enough'; - break; - case http2.constants.NGHTTP2_INTERNAL_ERROR: - code = Status.INTERNAL; - if (this.internalError === null) { - /* This error code was previously handled in the default case, and - * there are several instances of it online, so I wanted to - * preserve the original error message so that people find existing - * information in searches, but also include the more recognizable - * "Internal server error" message. */ - details = `Received RST_STREAM with code ${stream.rstCode} (Internal server error)`; - } else { - if (this.internalError.code === 'ECONNRESET' || this.internalError.code === 'ETIMEDOUT') { - code = Status.UNAVAILABLE; - details = this.internalError.message; - } else { - /* The "Received RST_STREAM with code ..." error is preserved - * here for continuity with errors reported online, but the - * error message at the end will probably be more relevant in - * most cases. */ - details = `Received RST_STREAM with code ${stream.rstCode} triggered by internal client error: ${this.internalError.message}`; - } - } - break; - default: - code = Status.INTERNAL; - details = `Received RST_STREAM with code ${stream.rstCode}`; - } - // This is a no-op if trailers were received at all. - // This is OK, because status codes emitted here correspond to more - // catastrophic issues that prevent us from receiving trailers in the - // first place. - this.endCall({ code, details, metadata: new Metadata() }); - }); - }); - stream.on('error', (err: SystemError) => { - /* We need an error handler here to stop "Uncaught Error" exceptions - * from bubbling up. However, errors here should all correspond to - * "close" events, where we will handle the error more granularly */ - /* Specifically looking for stream errors that were *not* constructed - * from a RST_STREAM response here: - * https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267 - */ - if (err.code !== 'ERR_HTTP2_STREAM_ERROR') { - this.trace( - 'Node error event: message=' + - err.message + - ' code=' + - err.code + - ' errno=' + - getSystemErrorName(err.errno) + - ' syscall=' + - err.syscall - ); - this.internalError = err; - } - this.streamEndWatchers.forEach(watcher => watcher(false)); - }); - if (!this.pendingRead) { - stream.pause(); - } - if (this.pendingWrite) { - if (!this.pendingWriteCallback) { - throw new Error('Invalid state in write handling code'); - } - this.trace( - 'sending data chunk of length ' + - this.pendingWrite.length + - ' (deferred)' - ); - try { - this.writeMessageToStream(this.pendingWrite, this.pendingWriteCallback); - } catch (error) { - this.endCall({ - code: Status.UNAVAILABLE, - details: `Write failed with error ${error.message}`, - metadata: new Metadata() - }); - } - } - this.maybeCloseWrites(); - } - } - - start(metadata: Metadata, listener: InterceptingListener) { - this.trace('Sending metadata'); - this.listener = listener; - this.channel._startCallStream(this, metadata); - this.maybeOutputStatus(); - } - - private destroyHttp2Stream() { - // The http2 stream could already have been destroyed if cancelWithStatus - // is called in response to an internal http2 error. - if (this.http2Stream !== null && !this.http2Stream.destroyed) { - /* If the call has ended with an OK status, communicate that when closing - * the stream, partly to avoid a situation in which we detect an error - * RST_STREAM as a result after we have the status */ - let code: number; - if (this.finalStatus?.code === Status.OK) { - code = http2.constants.NGHTTP2_NO_ERROR; - } else { - code = http2.constants.NGHTTP2_CANCEL; - } - this.trace('close http2 stream with code ' + code); - this.http2Stream.close(code); - } - } - - cancelWithStatus(status: Status, details: string): void { - this.trace( - 'cancelWithStatus code: ' + status + ' details: "' + details + '"' - ); - this.endCall({ code: status, details, metadata: new Metadata() }); - } - - getDeadline(): Deadline { - const deadlineList = [this.options.deadline]; - if (this.options.parentCall && this.options.flags & Propagate.DEADLINE) { - deadlineList.push(this.options.parentCall.getDeadline()); - } - if (this.configDeadline) { - deadlineList.push(this.configDeadline); - } - return getMinDeadline(deadlineList); - } - - getCredentials(): CallCredentials { - return this.credentials; - } - - setCredentials(credentials: CallCredentials): void { - this.credentials = this.channelCallCredentials.compose(credentials); - } - - getStatus(): StatusObject | null { - return this.finalStatus; - } - - getPeer(): string { - return this.subchannel?.getAddress() ?? this.channel.getTarget(); - } - - getMethod(): string { - return this.methodName; - } - - getHost(): string { - return this.options.host; - } - - setConfigDeadline(configDeadline: Deadline) { - this.configDeadline = configDeadline; - } - - addStatusWatcher(watcher: (status: StatusObject) => void) { - this.statusWatchers.push(watcher); - } - - addStreamEndWatcher(watcher: (success: boolean) => void) { - this.streamEndWatchers.push(watcher); - } - - addFilters(extraFilters: Filter[]) { - this.filterStack.push(extraFilters); - } - - getCallNumber() { - return this.callNumber; - } - - startRead() { - /* If the stream has ended with an error, we should not emit any more - * messages and we should communicate that the stream has ended */ - if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) { - this.readsClosed = true; - this.maybeOutputStatus(); - return; - } - this.canPush = true; - if (this.http2Stream === null) { - this.pendingRead = true; - } else { - if (this.unpushedReadMessages.length > 0) { - const nextMessage: Buffer = this.unpushedReadMessages.shift()!; - this.push(nextMessage); - return; - } - /* Only resume reading from the http2Stream if we don't have any pending - * messages to emit */ - this.http2Stream.resume(); - } - } - - private maybeCloseWrites() { - if ( - this.writesClosed && - !this.isWriteFilterPending && - this.http2Stream !== null - ) { - this.trace('calling end() on HTTP/2 stream'); - this.http2Stream.end(); - } - } - - sendMessageWithContext(context: MessageContext, message: Buffer) { - this.trace('write() called with message of length ' + message.length); - const writeObj: WriteObject = { - message, - flags: context.flags, - }; - const cb: WriteCallback = (error?: Error | null) => { - let code: Status = Status.UNAVAILABLE; - if ((error as NodeJS.ErrnoException)?.code === 'ERR_STREAM_WRITE_AFTER_END') { - code = Status.INTERNAL; - } - if (error) { - this.cancelWithStatus(code, `Write error: ${error.message}`); - } - context.callback?.(); - }; - this.isWriteFilterPending = true; - this.filterStack.sendMessage(Promise.resolve(writeObj)).then((message) => { - this.isWriteFilterPending = false; - if (this.http2Stream === null) { - this.trace( - 'deferring writing data chunk of length ' + message.message.length - ); - this.pendingWrite = message.message; - this.pendingWriteCallback = cb; - } else { - this.trace('sending data chunk of length ' + message.message.length); - try { - this.writeMessageToStream(message.message, cb); - } catch (error) { - this.endCall({ - code: Status.UNAVAILABLE, - details: `Write failed with error ${error.message}`, - metadata: new Metadata() - }); - } - this.maybeCloseWrites(); - } - }, this.handleFilterError.bind(this)); - } - - halfClose() { - this.trace('end() called'); - this.writesClosed = true; - this.maybeCloseWrites(); - } -} diff --git a/packages/grpc-js/src/call.ts b/packages/grpc-js/src/call.ts index fcc3159db..c587a195d 100644 --- a/packages/grpc-js/src/call.ts +++ b/packages/grpc-js/src/call.ts @@ -18,7 +18,7 @@ import { EventEmitter } from 'events'; import { Duplex, Readable, Writable } from 'stream'; -import { StatusObject, MessageContext } from './call-stream'; +import { StatusObject, MessageContext } from './call-interface'; import { Status } from './constants'; import { EmitterAugmentation1 } from './events'; import { Metadata } from './metadata'; diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 635b52d6f..aaf14bb22 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -15,57 +15,15 @@ * */ -import { - Deadline, - Call, - Http2CallStream, - CallStreamOptions, -} from './call-stream'; import { ChannelCredentials } from './channel-credentials'; import { ChannelOptions } from './channel-options'; -import { ResolvingLoadBalancer } from './resolving-load-balancer'; -import { SubchannelPool, getSubchannelPool } from './subchannel-pool'; -import { ChannelControlHelper } from './load-balancer'; -import { UnavailablePicker, Picker, PickResultType } from './picker'; -import { Metadata } from './metadata'; -import { Status, LogVerbosity, Propagate } from './constants'; -import { FilterStackFactory } from './filter-stack'; -import { CallCredentialsFilterFactory } from './call-credentials-filter'; -import { DeadlineFilterFactory } from './deadline-filter'; -import { CompressionFilterFactory } from './compression-filter'; -import { - CallConfig, - ConfigSelector, - getDefaultAuthority, - mapUriDefaultScheme, -} from './resolver'; -import { trace, log } from './logging'; -import { SubchannelAddress } from './subchannel-address'; -import { MaxMessageSizeFilterFactory } from './max-message-size-filter'; -import { mapProxyName } from './http_proxy'; -import { GrpcUri, parseUri, uriToString } from './uri-parser'; import { ServerSurfaceCall } from './server-call'; -import { Filter } from './filter'; import { ConnectivityState } from './connectivity-state'; -import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz'; -import { Subchannel } from './subchannel'; - -/** - * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args - */ -const MAX_TIMEOUT_TIME = 2147483647; - -let nextCallNumber = 0; - -function getNewCallNumber(): number { - const callNumber = nextCallNumber; - nextCallNumber += 1; - if (nextCallNumber >= Number.MAX_SAFE_INTEGER) { - nextCallNumber = 0; - } - return callNumber; -} +import { ChannelRef } from './channelz'; +import { Call } from './call-interface'; +import { InternalChannel } from './internal-channel'; +import { Deadline } from './deadline'; /** * An interface that represents a communication channel to a server specified @@ -132,57 +90,14 @@ export interface Channel { ): Call; } -interface ConnectivityStateWatcher { - currentState: ConnectivityState; - timer: NodeJS.Timeout | null; - callback: (error?: Error) => void; -} - export class ChannelImplementation implements Channel { - private resolvingLoadBalancer: ResolvingLoadBalancer; - private subchannelPool: SubchannelPool; - private connectivityState: ConnectivityState = ConnectivityState.IDLE; - private currentPicker: Picker = new UnavailablePicker(); - /** - * Calls queued up to get a call config. Should only be populated before the - * first time the resolver returns a result, which includes the ConfigSelector. - */ - private configSelectionQueue: Array<{ - callStream: Http2CallStream; - callMetadata: Metadata; - }> = []; - private pickQueue: Array<{ - callStream: Http2CallStream; - callMetadata: Metadata; - callConfig: CallConfig; - dynamicFilters: Filter[]; - }> = []; - private connectivityStateWatchers: ConnectivityStateWatcher[] = []; - private defaultAuthority: string; - private filterStackFactory: FilterStackFactory; - private target: GrpcUri; - /** - * This timer does not do anything on its own. Its purpose is to hold the - * event loop open while there are any pending calls for the channel that - * have not yet been assigned to specific subchannels. In other words, - * the invariant is that callRefTimer is reffed if and only if pickQueue - * is non-empty. - */ - private callRefTimer: NodeJS.Timer; - private configSelector: ConfigSelector | null = null; - // Channelz info - private readonly channelzEnabled: boolean = true; - private originalTarget: string; - private channelzRef: ChannelRef; - private channelzTrace: ChannelzTrace; - private callTracker = new ChannelzCallTracker(); - private childrenTracker = new ChannelzChildrenTracker(); + private internalChannel: InternalChannel; constructor( target: string, - private readonly credentials: ChannelCredentials, - private readonly options: ChannelOptions + credentials: ChannelCredentials, + options: ChannelOptions ) { if (typeof target !== 'string') { throw new TypeError('Channel target must be a string'); @@ -197,497 +112,20 @@ export class ChannelImplementation implements Channel { throw new TypeError('Channel options must be an object'); } } - this.originalTarget = target; - const originalTargetUri = parseUri(target); - if (originalTargetUri === null) { - throw new Error(`Could not parse target name "${target}"`); - } - /* This ensures that the target has a scheme that is registered with the - * resolver */ - const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri); - if (defaultSchemeMapResult === null) { - throw new Error( - `Could not find a default scheme for target name "${target}"` - ); - } - - this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME); - this.callRefTimer.unref?.(); - - if (this.options['grpc.enable_channelz'] === 0) { - this.channelzEnabled = false; - } - - this.channelzTrace = new ChannelzTrace(); - if (this.channelzEnabled) { - this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo()); - this.channelzTrace.addTrace('CT_INFO', 'Channel created'); - } else { - // Dummy channelz ref that will never be used - this.channelzRef = { - kind: 'channel', - id: -1, - name: '' - }; - } - - if (this.options['grpc.default_authority']) { - this.defaultAuthority = this.options['grpc.default_authority'] as string; - } else { - this.defaultAuthority = getDefaultAuthority(defaultSchemeMapResult); - } - const proxyMapResult = mapProxyName(defaultSchemeMapResult, options); - this.target = proxyMapResult.target; - this.options = Object.assign({}, this.options, proxyMapResult.extraOptions); - - /* The global boolean parameter to getSubchannelPool has the inverse meaning to what - * the grpc.use_local_subchannel_pool channel option means. */ - this.subchannelPool = getSubchannelPool( - (options['grpc.use_local_subchannel_pool'] ?? 0) === 0 - ); - const channelControlHelper: ChannelControlHelper = { - createSubchannel: ( - subchannelAddress: SubchannelAddress, - subchannelArgs: ChannelOptions - ) => { - const subchannel = this.subchannelPool.getOrCreateSubchannel( - this.target, - subchannelAddress, - Object.assign({}, this.options, subchannelArgs), - this.credentials - ); - if (this.channelzEnabled) { - this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef()); - } - return subchannel; - }, - updateState: (connectivityState: ConnectivityState, picker: Picker) => { - this.currentPicker = picker; - const queueCopy = this.pickQueue.slice(); - this.pickQueue = []; - this.callRefTimerUnref(); - for (const { callStream, callMetadata, callConfig, dynamicFilters } of queueCopy) { - this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); - } - this.updateState(connectivityState); - }, - requestReresolution: () => { - // This should never be called. - throw new Error( - 'Resolving load balancer should never call requestReresolution' - ); - }, - addChannelzChild: (child: ChannelRef | SubchannelRef) => { - if (this.channelzEnabled) { - this.childrenTracker.refChild(child); - } - }, - removeChannelzChild: (child: ChannelRef | SubchannelRef) => { - if (this.channelzEnabled) { - this.childrenTracker.unrefChild(child); - } - } - }; - this.resolvingLoadBalancer = new ResolvingLoadBalancer( - this.target, - channelControlHelper, - options, - (configSelector) => { - if (this.channelzEnabled) { - this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded'); - } - this.configSelector = configSelector; - /* We process the queue asynchronously to ensure that the corresponding - * load balancer update has completed. */ - process.nextTick(() => { - const localQueue = this.configSelectionQueue; - this.configSelectionQueue = []; - this.callRefTimerUnref(); - for (const { callStream, callMetadata } of localQueue) { - this.tryGetConfig(callStream, callMetadata); - } - this.configSelectionQueue = []; - }); - }, - (status) => { - if (this.channelzEnabled) { - this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"'); - } - if (this.configSelectionQueue.length > 0) { - this.trace('Name resolution failed with calls queued for config selection'); - } - const localQueue = this.configSelectionQueue; - this.configSelectionQueue = []; - this.callRefTimerUnref(); - for (const { callStream, callMetadata } of localQueue) { - if (callMetadata.getOptions().waitForReady) { - this.callRefTimerRef(); - this.configSelectionQueue.push({ callStream, callMetadata }); - } else { - callStream.cancelWithStatus(status.code, status.details); - } - } - } - ); - this.filterStackFactory = new FilterStackFactory([ - new CallCredentialsFilterFactory(this), - new DeadlineFilterFactory(this), - new MaxMessageSizeFilterFactory(this.options), - new CompressionFilterFactory(this, this.options), - ]); - this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2)); - const error = new Error(); - trace(LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + error.stack?.substring(error.stack.indexOf('\n')+1)); - } - - private getChannelzInfo(): ChannelInfo { - return { - target: this.originalTarget, - state: this.connectivityState, - trace: this.channelzTrace, - callTracker: this.callTracker, - children: this.childrenTracker.getChildLists() - }; - } - - private trace(text: string, verbosityOverride?: LogVerbosity) { - trace(verbosityOverride ?? LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + text); - } - - private callRefTimerRef() { - // If the hasRef function does not exist, always run the code - if (!this.callRefTimer.hasRef?.()) { - this.trace( - 'callRefTimer.ref | configSelectionQueue.length=' + - this.configSelectionQueue.length + - ' pickQueue.length=' + - this.pickQueue.length - ); - this.callRefTimer.ref?.(); - } - } - - private callRefTimerUnref() { - // If the hasRef function does not exist, always run the code - if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) { - this.trace( - 'callRefTimer.unref | configSelectionQueue.length=' + - this.configSelectionQueue.length + - ' pickQueue.length=' + - this.pickQueue.length - ); - this.callRefTimer.unref?.(); - } - } - - private pushPick( - callStream: Http2CallStream, - callMetadata: Metadata, - callConfig: CallConfig, - dynamicFilters: Filter[] - ) { - this.pickQueue.push({ callStream, callMetadata, callConfig, dynamicFilters }); - this.callRefTimerRef(); - } - - /** - * Check the picker output for the given call and corresponding metadata, - * and take any relevant actions. Should not be called while iterating - * over pickQueue. - * @param callStream - * @param callMetadata - */ - private tryPick( - callStream: Http2CallStream, - callMetadata: Metadata, - callConfig: CallConfig, - dynamicFilters: Filter[] - ) { - const pickResult = this.currentPicker.pick({ - metadata: callMetadata, - extraPickInfo: callConfig.pickInformation, - }); - const subchannelString = pickResult.subchannel ? - '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() : - '' + pickResult.subchannel; - this.trace( - 'Pick result for call [' + - callStream.getCallNumber() + - ']: ' + - PickResultType[pickResult.pickResultType] + - ' subchannel: ' + - subchannelString + - ' status: ' + - pickResult.status?.code + - ' ' + - pickResult.status?.details - ); - switch (pickResult.pickResultType) { - case PickResultType.COMPLETE: - if (pickResult.subchannel === null) { - callStream.cancelWithStatus( - Status.UNAVAILABLE, - 'Request dropped by load balancing policy' - ); - // End the call with an error - } else { - /* If the subchannel is not in the READY state, that indicates a bug - * somewhere in the load balancer or picker. So, we log an error and - * queue the pick to be tried again later. */ - if ( - pickResult.subchannel!.getConnectivityState() !== - ConnectivityState.READY - ) { - log( - LogVerbosity.ERROR, - 'Error: COMPLETE pick result subchannel ' + - subchannelString + - ' has state ' + - ConnectivityState[pickResult.subchannel!.getConnectivityState()] - ); - this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); - break; - } - /* We need to clone the callMetadata here because the transparent - * retry code in the promise resolution handler use the same - * callMetadata object, so it needs to stay unmodified */ - callStream.filterStack - .sendMetadata(Promise.resolve(callMetadata.clone())) - .then( - (finalMetadata) => { - const subchannelState: ConnectivityState = pickResult.subchannel!.getConnectivityState(); - if (subchannelState === ConnectivityState.READY) { - try { - const pickExtraFilters = pickResult.extraFilterFactories.map(factory => factory.createFilter(callStream)); - pickResult.subchannel?.getRealSubchannel().startCallStream( - finalMetadata, - callStream, - [...dynamicFilters, ...pickExtraFilters] - ); - /* If we reach this point, the call stream has started - * successfully */ - callConfig.onCommitted?.(); - pickResult.onCallStarted?.(); - } catch (error) { - const errorCode = (error as NodeJS.ErrnoException).code; - if (errorCode === 'ERR_HTTP2_GOAWAY_SESSION' || - errorCode === 'ERR_HTTP2_INVALID_SESSION' - ) { - /* An error here indicates that something went wrong with - * the picked subchannel's http2 stream right before we - * tried to start the stream. We are handling a promise - * result here, so this is asynchronous with respect to the - * original tryPick call, so calling it again is not - * recursive. We call tryPick immediately instead of - * queueing this pick again because handling the queue is - * triggered by state changes, and we want to immediately - * check if the state has already changed since the - * previous tryPick call. We do this instead of cancelling - * the stream because the correct behavior may be - * re-queueing instead, based on the logic in the rest of - * tryPick */ - this.trace( - 'Failed to start call on picked subchannel ' + - subchannelString + - ' with error ' + - (error as Error).message + - '. Retrying pick', - LogVerbosity.INFO - ); - this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); - } else { - this.trace( - 'Failed to start call on picked subchanel ' + - subchannelString + - ' with error ' + - (error as Error).message + - '. Ending call', - LogVerbosity.INFO - ); - callStream.cancelWithStatus( - Status.INTERNAL, - `Failed to start HTTP/2 stream with error: ${ - (error as Error).message - }` - ); - } - } - } else { - /* The logic for doing this here is the same as in the catch - * block above */ - this.trace( - 'Picked subchannel ' + - subchannelString + - ' has state ' + - ConnectivityState[subchannelState] + - ' after metadata filters. Retrying pick', - LogVerbosity.INFO - ); - this.tryPick(callStream, callMetadata, callConfig, dynamicFilters); - } - }, - (error: Error & { code: number }) => { - // We assume the error code isn't 0 (Status.OK) - callStream.cancelWithStatus( - typeof error.code === 'number' ? error.code : Status.UNKNOWN, - `Getting metadata from plugin failed with error: ${error.message}` - ); - } - ); - } - break; - case PickResultType.QUEUE: - this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); - break; - case PickResultType.TRANSIENT_FAILURE: - if (callMetadata.getOptions().waitForReady) { - this.pushPick(callStream, callMetadata, callConfig, dynamicFilters); - } else { - callStream.cancelWithStatus( - pickResult.status!.code, - pickResult.status!.details - ); - } - break; - case PickResultType.DROP: - callStream.cancelWithStatus( - pickResult.status!.code, - pickResult.status!.details - ); - break; - default: - throw new Error( - `Invalid state: unknown pickResultType ${pickResult.pickResultType}` - ); - } - } - - private removeConnectivityStateWatcher( - watcherObject: ConnectivityStateWatcher - ) { - const watcherIndex = this.connectivityStateWatchers.findIndex( - (value) => value === watcherObject - ); - if (watcherIndex >= 0) { - this.connectivityStateWatchers.splice(watcherIndex, 1); - } - } - - private updateState(newState: ConnectivityState): void { - trace( - LogVerbosity.DEBUG, - 'connectivity_state', - '(' + this.channelzRef.id + ') ' + - uriToString(this.target) + - ' ' + - ConnectivityState[this.connectivityState] + - ' -> ' + - ConnectivityState[newState] - ); - if (this.channelzEnabled) { - this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]); - } - this.connectivityState = newState; - const watchersCopy = this.connectivityStateWatchers.slice(); - for (const watcherObject of watchersCopy) { - if (newState !== watcherObject.currentState) { - if (watcherObject.timer) { - clearTimeout(watcherObject.timer); - } - this.removeConnectivityStateWatcher(watcherObject); - watcherObject.callback(); - } - } - } - - private tryGetConfig(stream: Http2CallStream, metadata: Metadata) { - if (stream.getStatus() !== null) { - /* If the stream has a status, it has already finished and we don't need - * to take any more actions on it. */ - return; - } - if (this.configSelector === null) { - /* This branch will only be taken at the beginning of the channel's life, - * before the resolver ever returns a result. So, the - * ResolvingLoadBalancer may be idle and if so it needs to be kicked - * because it now has a pending request. */ - this.resolvingLoadBalancer.exitIdle(); - this.configSelectionQueue.push({ - callStream: stream, - callMetadata: metadata, - }); - this.callRefTimerRef(); - } else { - const callConfig = this.configSelector(stream.getMethod(), metadata); - if (callConfig.status === Status.OK) { - if (callConfig.methodConfig.timeout) { - const deadline = new Date(); - deadline.setSeconds( - deadline.getSeconds() + callConfig.methodConfig.timeout.seconds - ); - deadline.setMilliseconds( - deadline.getMilliseconds() + - callConfig.methodConfig.timeout.nanos / 1_000_000 - ); - stream.setConfigDeadline(deadline); - // Refreshing the filters makes the deadline filter pick up the new deadline - stream.filterStack.refresh(); - } - if (callConfig.dynamicFilterFactories.length > 0) { - /* These dynamicFilters are the mechanism for implementing gRFC A39: - * https://github.com/grpc/proposal/blob/master/A39-xds-http-filters.md - * We run them here instead of with the rest of the filters because - * that spec says "the xDS HTTP filters will run in between name - * resolution and load balancing". - * - * We use the filter stack here to simplify the multi-filter async - * waterfall logic, but we pass along the underlying list of filters - * to avoid having nested filter stacks when combining it with the - * original filter stack. We do not pass along the original filter - * factory list because these filters may need to persist data - * between sending headers and other operations. */ - const dynamicFilterStackFactory = new FilterStackFactory(callConfig.dynamicFilterFactories); - const dynamicFilterStack = dynamicFilterStackFactory.createFilter(stream); - dynamicFilterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => { - this.tryPick(stream, filteredMetadata, callConfig, dynamicFilterStack.getFilters()); - }); - } else { - this.tryPick(stream, metadata, callConfig, []); - } - } else { - stream.cancelWithStatus( - callConfig.status, - 'Failed to route call to method ' + stream.getMethod() - ); - } - } - } - _startCallStream(stream: Http2CallStream, metadata: Metadata) { - this.tryGetConfig(stream, metadata.clone()); + this.internalChannel = new InternalChannel(target, credentials, options); } close() { - this.resolvingLoadBalancer.destroy(); - this.updateState(ConnectivityState.SHUTDOWN); - clearInterval(this.callRefTimer); - if (this.channelzEnabled) { - unregisterChannelzRef(this.channelzRef); - } - - this.subchannelPool.unrefUnusedSubchannels(); + this.internalChannel.close(); } getTarget() { - return uriToString(this.target); + return this.internalChannel.getTarget(); } getConnectivityState(tryToConnect: boolean) { - const connectivityState = this.connectivityState; - if (tryToConnect) { - this.resolvingLoadBalancer.exitIdle(); - } - return connectivityState; + return this.internalChannel.getConnectivityState(tryToConnect); } watchConnectivityState( @@ -695,34 +133,7 @@ export class ChannelImplementation implements Channel { deadline: Date | number, callback: (error?: Error) => void ): void { - if (this.connectivityState === ConnectivityState.SHUTDOWN) { - throw new Error('Channel has been shut down'); - } - let timer = null; - if (deadline !== Infinity) { - const deadlineDate: Date = - deadline instanceof Date ? deadline : new Date(deadline); - const now = new Date(); - if (deadline === -Infinity || deadlineDate <= now) { - process.nextTick( - callback, - new Error('Deadline passed without connectivity state change') - ); - return; - } - timer = setTimeout(() => { - this.removeConnectivityStateWatcher(watcherObject); - callback( - new Error('Deadline passed without connectivity state change') - ); - }, deadlineDate.getTime() - now.getTime()); - } - const watcherObject = { - currentState, - callback, - timer, - }; - this.connectivityStateWatchers.push(watcherObject); + this.internalChannel.watchConnectivityState(currentState, deadline, callback); } /** @@ -731,7 +142,7 @@ export class ChannelImplementation implements Channel { * @returns */ getChannelzRef() { - return this.channelzRef; + return this.internalChannel.getChannelzRef(); } createCall( @@ -749,42 +160,6 @@ export class ChannelImplementation implements Channel { 'Channel#createCall: deadline must be a number or Date' ); } - if (this.connectivityState === ConnectivityState.SHUTDOWN) { - throw new Error('Channel has been shut down'); - } - const callNumber = getNewCallNumber(); - this.trace( - 'createCall [' + - callNumber + - '] method="' + - method + - '", deadline=' + - deadline - ); - const finalOptions: CallStreamOptions = { - deadline: deadline, - flags: propagateFlags ?? Propagate.DEFAULTS, - host: host ?? this.defaultAuthority, - parentCall: parentCall, - }; - const stream: Http2CallStream = new Http2CallStream( - method, - this, - finalOptions, - this.filterStackFactory, - this.credentials._getCallCredentials(), - callNumber - ); - if (this.channelzEnabled) { - this.callTracker.addCallStarted(); - stream.addStatusWatcher(status => { - if (status.code === Status.OK) { - this.callTracker.addCallSucceeded(); - } else { - this.callTracker.addCallFailed(); - } - }); - } - return stream; + return this.internalChannel.createCall(method, deadline, host, parentCall, propagateFlags); } } diff --git a/packages/grpc-js/src/client-interceptors.ts b/packages/grpc-js/src/client-interceptors.ts index ddb296ff1..52320b0b3 100644 --- a/packages/grpc-js/src/client-interceptors.ts +++ b/packages/grpc-js/src/client-interceptors.ts @@ -28,7 +28,7 @@ import { isInterceptingListener, MessageContext, Call, -} from './call-stream'; +} from './call-interface'; import { Status } from './constants'; import { Channel } from './channel'; import { CallOptions } from './client'; diff --git a/packages/grpc-js/src/client.ts b/packages/grpc-js/src/client.ts index ed9407cd8..688d8016c 100644 --- a/packages/grpc-js/src/client.ts +++ b/packages/grpc-js/src/client.ts @@ -29,7 +29,7 @@ import { SurfaceCall, } from './call'; import { CallCredentials } from './call-credentials'; -import { Deadline, StatusObject } from './call-stream'; +import { StatusObject } from './call-interface'; import { Channel, ChannelImplementation } from './channel'; import { ConnectivityState } from './connectivity-state'; import { ChannelCredentials } from './channel-credentials'; @@ -50,6 +50,7 @@ import { ServerWritableStream, ServerDuplexStream, } from './server-call'; +import { Deadline } from './deadline'; const CHANNEL_SYMBOL = Symbol(); const INTERCEPTOR_SYMBOL = Symbol(); diff --git a/packages/grpc-js/src/compression-filter.ts b/packages/grpc-js/src/compression-filter.ts index 40825467e..f87614114 100644 --- a/packages/grpc-js/src/compression-filter.ts +++ b/packages/grpc-js/src/compression-filter.ts @@ -17,7 +17,7 @@ import * as zlib from 'zlib'; -import { Call, WriteObject, WriteFlags } from './call-stream'; +import { WriteObject, WriteFlags } from './call-interface'; import { Channel } from './channel'; import { ChannelOptions } from './channel-options'; import { CompressionAlgorithms } from './compression-algorithms'; @@ -283,7 +283,7 @@ export class CompressionFilterFactory implements FilterFactory { private sharedFilterConfig: SharedCompressionFilterConfig = {}; constructor(private readonly channel: Channel, private readonly options: ChannelOptions) {} - createFilter(callStream: Call): CompressionFilter { + createFilter(): CompressionFilter { return new CompressionFilter(this.options, this.sharedFilterConfig); } } diff --git a/packages/grpc-js/src/deadline-filter.ts b/packages/grpc-js/src/deadline-filter.ts deleted file mode 100644 index 7bdd764f2..000000000 --- a/packages/grpc-js/src/deadline-filter.ts +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import { Call, StatusObject } from './call-stream'; -import { Channel } from './channel'; -import { Status } from './constants'; -import { BaseFilter, Filter, FilterFactory } from './filter'; -import { Metadata } from './metadata'; - -const units: Array<[string, number]> = [ - ['m', 1], - ['S', 1000], - ['M', 60 * 1000], - ['H', 60 * 60 * 1000], -]; - -function getDeadline(deadline: number) { - const now = new Date().getTime(); - const timeoutMs = Math.max(deadline - now, 0); - for (const [unit, factor] of units) { - const amount = timeoutMs / factor; - if (amount < 1e8) { - return String(Math.ceil(amount)) + unit; - } - } - throw new Error('Deadline is too far in the future'); -} - -export class DeadlineFilter extends BaseFilter implements Filter { - private timer: NodeJS.Timer | null = null; - private deadline = Infinity; - constructor( - private readonly channel: Channel, - private readonly callStream: Call - ) { - super(); - this.retreiveDeadline(); - this.runTimer(); - } - - private retreiveDeadline() { - const callDeadline = this.callStream.getDeadline(); - if (callDeadline instanceof Date) { - this.deadline = callDeadline.getTime(); - } else { - this.deadline = callDeadline; - } - } - - private runTimer() { - if (this.timer) { - clearTimeout(this.timer); - } - const now: number = new Date().getTime(); - const timeout = this.deadline - now; - if (timeout <= 0) { - process.nextTick(() => { - this.callStream.cancelWithStatus( - Status.DEADLINE_EXCEEDED, - 'Deadline exceeded' - ); - }); - } else if (this.deadline !== Infinity) { - this.timer = setTimeout(() => { - this.callStream.cancelWithStatus( - Status.DEADLINE_EXCEEDED, - 'Deadline exceeded' - ); - }, timeout); - this.timer.unref?.(); - } - } - - refresh() { - this.retreiveDeadline(); - this.runTimer(); - } - - async sendMetadata(metadata: Promise) { - if (this.deadline === Infinity) { - return metadata; - } - /* The input metadata promise depends on the original channel.connect() - * promise, so when it is complete that implies that the channel is - * connected */ - const finalMetadata = await metadata; - const timeoutString = getDeadline(this.deadline); - finalMetadata.set('grpc-timeout', timeoutString); - return finalMetadata; - } - - receiveTrailers(status: StatusObject) { - if (this.timer) { - clearTimeout(this.timer); - } - return status; - } -} - -export class DeadlineFilterFactory implements FilterFactory { - constructor(private readonly channel: Channel) {} - - createFilter(callStream: Call): DeadlineFilter { - return new DeadlineFilter(this.channel, callStream); - } -} diff --git a/packages/grpc-js/src/deadline.ts b/packages/grpc-js/src/deadline.ts new file mode 100644 index 000000000..58ea0a805 --- /dev/null +++ b/packages/grpc-js/src/deadline.ts @@ -0,0 +1,58 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +export type Deadline = Date | number; + +export function minDeadline(...deadlineList: Deadline[]): Deadline { + let minValue = Infinity; + for (const deadline of deadlineList) { + const deadlineMsecs = + deadline instanceof Date ? deadline.getTime() : deadline; + if (deadlineMsecs < minValue) { + minValue = deadlineMsecs; + } + } + return minValue; +} + +const units: Array<[string, number]> = [ + ['m', 1], + ['S', 1000], + ['M', 60 * 1000], + ['H', 60 * 60 * 1000], +]; + +export function getDeadlineTimeoutString(deadline: Deadline) { + const now = new Date().getTime(); + if (deadline instanceof Date) { + deadline = deadline.getTime(); + } + const timeoutMs = Math.max(deadline - now, 0); + for (const [unit, factor] of units) { + const amount = timeoutMs / factor; + if (amount < 1e8) { + return String(Math.ceil(amount)) + unit; + } + } + throw new Error('Deadline is too far in the future') +} + +export function getRelativeTimeout(deadline: Deadline) { + const deadlineMs = deadline instanceof Date ? deadline.getTime() : deadline; + const now = new Date().getTime(); + return deadlineMs - now; +} \ No newline at end of file diff --git a/packages/grpc-js/src/experimental.ts b/packages/grpc-js/src/experimental.ts index fcafbeb01..f286bbcd6 100644 --- a/packages/grpc-js/src/experimental.ts +++ b/packages/grpc-js/src/experimental.ts @@ -31,7 +31,7 @@ export { PickArgs, PickResultType, } from './picker'; -export { Call as CallStream } from './call-stream'; +export { Call as CallStream } from './call-interface'; export { Filter, BaseFilter, FilterFactory } from './filter'; export { FilterStackFactory } from './filter-stack'; export { registerAdminService } from './admin'; diff --git a/packages/grpc-js/src/filter-stack.ts b/packages/grpc-js/src/filter-stack.ts index a9e754428..f44c43839 100644 --- a/packages/grpc-js/src/filter-stack.ts +++ b/packages/grpc-js/src/filter-stack.ts @@ -15,14 +15,14 @@ * */ -import { Call, StatusObject, WriteObject } from './call-stream'; +import { StatusObject, WriteObject } from './call-interface'; import { Filter, FilterFactory } from './filter'; import { Metadata } from './metadata'; export class FilterStack implements Filter { constructor(private readonly filters: Filter[]) {} - sendMetadata(metadata: Promise) { + sendMetadata(metadata: Promise): Promise { let result: Promise = metadata; for (let i = 0; i < this.filters.length; i++) { @@ -72,12 +72,6 @@ export class FilterStack implements Filter { return result; } - refresh(): void { - for (const filter of this.filters) { - filter.refresh(); - } - } - push(filters: Filter[]) { this.filters.unshift(...filters); } @@ -94,9 +88,9 @@ export class FilterStackFactory implements FilterFactory { this.factories.unshift(...filterFactories); } - createFilter(callStream: Call): FilterStack { + createFilter(): FilterStack { return new FilterStack( - this.factories.map((factory) => factory.createFilter(callStream)) + this.factories.map((factory) => factory.createFilter()) ); } } diff --git a/packages/grpc-js/src/filter.ts b/packages/grpc-js/src/filter.ts index 8475a0a5a..5313f91a8 100644 --- a/packages/grpc-js/src/filter.ts +++ b/packages/grpc-js/src/filter.ts @@ -15,12 +15,14 @@ * */ -import { Call, StatusObject, WriteObject } from './call-stream'; +import { StatusObject, WriteObject } from './call-interface'; import { Metadata } from './metadata'; /** * Filter classes represent related per-call logic and state that is primarily - * used to modify incoming and outgoing data + * used to modify incoming and outgoing data. All async filters can be + * rejected. The rejection error must be a StatusObject, and a rejection will + * cause the call to end with that status. */ export interface Filter { sendMetadata(metadata: Promise): Promise; @@ -32,8 +34,6 @@ export interface Filter { receiveMessage(message: Promise): Promise; receiveTrailers(status: StatusObject): StatusObject; - - refresh(): void; } export abstract class BaseFilter implements Filter { @@ -56,10 +56,8 @@ export abstract class BaseFilter implements Filter { receiveTrailers(status: StatusObject): StatusObject { return status; } - - refresh(): void {} } export interface FilterFactory { - createFilter(callStream: Call): T; + createFilter(): T; } diff --git a/packages/grpc-js/src/index.ts b/packages/grpc-js/src/index.ts index b4855fb59..786fa9925 100644 --- a/packages/grpc-js/src/index.ts +++ b/packages/grpc-js/src/index.ts @@ -23,7 +23,7 @@ import { ServiceError, } from './call'; import { CallCredentials, OAuth2Client } from './call-credentials'; -import { Deadline, StatusObject } from './call-stream'; +import { StatusObject } from './call-interface'; import { Channel, ChannelImplementation } from './channel'; import { CompressionAlgorithms } from './compression-algorithms'; import { ConnectivityState } from './connectivity-state'; @@ -237,7 +237,7 @@ export const getClientChannel = (client: Client) => { export { StatusBuilder }; -export { Listener } from './call-stream'; +export { Listener } from './call-interface'; export { Requester, @@ -275,6 +275,7 @@ import * as load_balancer_pick_first from './load-balancer-pick-first'; import * as load_balancer_round_robin from './load-balancer-round-robin'; import * as load_balancer_outlier_detection from './load-balancer-outlier-detection'; import * as channelz from './channelz'; +import { Deadline } from './deadline'; const clientVersion = require('../../package.json').version; diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts new file mode 100644 index 000000000..772ed8e1d --- /dev/null +++ b/packages/grpc-js/src/internal-channel.ts @@ -0,0 +1,504 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { ChannelCredentials } from './channel-credentials'; +import { ChannelOptions } from './channel-options'; +import { ResolvingLoadBalancer } from './resolving-load-balancer'; +import { SubchannelPool, getSubchannelPool } from './subchannel-pool'; +import { ChannelControlHelper } from './load-balancer'; +import { UnavailablePicker, Picker, PickResultType } from './picker'; +import { Metadata } from './metadata'; +import { Status, LogVerbosity, Propagate } from './constants'; +import { FilterStackFactory } from './filter-stack'; +import { CompressionFilterFactory } from './compression-filter'; +import { + CallConfig, + ConfigSelector, + getDefaultAuthority, + mapUriDefaultScheme, +} from './resolver'; +import { trace, log } from './logging'; +import { SubchannelAddress } from './subchannel-address'; +import { MaxMessageSizeFilterFactory } from './max-message-size-filter'; +import { mapProxyName } from './http_proxy'; +import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser'; +import { ServerSurfaceCall } from './server-call'; +import { Filter } from './filter'; + +import { ConnectivityState } from './connectivity-state'; +import { ChannelInfo, ChannelRef, ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzChannel, SubchannelRef, unregisterChannelzRef } from './channelz'; +import { Subchannel } from './subchannel'; +import { LoadBalancingCall } from './load-balancing-call'; +import { CallCredentials } from './call-credentials'; +import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from './call-interface'; +import { SubchannelCall } from './subchannel-call'; +import { Deadline, getDeadlineTimeoutString } from './deadline'; +import { ResolvingCall } from './resolving-call'; +import { getNextCallNumber } from './call-number'; + +/** + * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args + */ +const MAX_TIMEOUT_TIME = 2147483647; + +interface ConnectivityStateWatcher { + currentState: ConnectivityState; + timer: NodeJS.Timeout | null; + callback: (error?: Error) => void; +} + +export class InternalChannel { + + private resolvingLoadBalancer: ResolvingLoadBalancer; + private subchannelPool: SubchannelPool; + private connectivityState: ConnectivityState = ConnectivityState.IDLE; + private currentPicker: Picker = new UnavailablePicker(); + /** + * Calls queued up to get a call config. Should only be populated before the + * first time the resolver returns a result, which includes the ConfigSelector. + */ + private configSelectionQueue: ResolvingCall[] = []; + private pickQueue: LoadBalancingCall[] = []; + private connectivityStateWatchers: ConnectivityStateWatcher[] = []; + private defaultAuthority: string; + private filterStackFactory: FilterStackFactory; + private target: GrpcUri; + /** + * This timer does not do anything on its own. Its purpose is to hold the + * event loop open while there are any pending calls for the channel that + * have not yet been assigned to specific subchannels. In other words, + * the invariant is that callRefTimer is reffed if and only if pickQueue + * is non-empty. + */ + private callRefTimer: NodeJS.Timer; + private configSelector: ConfigSelector | null = null; + + // Channelz info + private readonly channelzEnabled: boolean = true; + private originalTarget: string; + private channelzRef: ChannelRef; + private channelzTrace: ChannelzTrace; + private callTracker = new ChannelzCallTracker(); + private childrenTracker = new ChannelzChildrenTracker(); + + constructor( + target: string, + private readonly credentials: ChannelCredentials, + private readonly options: ChannelOptions + ) { + if (typeof target !== 'string') { + throw new TypeError('Channel target must be a string'); + } + if (!(credentials instanceof ChannelCredentials)) { + throw new TypeError( + 'Channel credentials must be a ChannelCredentials object' + ); + } + if (options) { + if (typeof options !== 'object') { + throw new TypeError('Channel options must be an object'); + } + } + this.originalTarget = target; + const originalTargetUri = parseUri(target); + if (originalTargetUri === null) { + throw new Error(`Could not parse target name "${target}"`); + } + /* This ensures that the target has a scheme that is registered with the + * resolver */ + const defaultSchemeMapResult = mapUriDefaultScheme(originalTargetUri); + if (defaultSchemeMapResult === null) { + throw new Error( + `Could not find a default scheme for target name "${target}"` + ); + } + + this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME); + this.callRefTimer.unref?.(); + + if (this.options['grpc.enable_channelz'] === 0) { + this.channelzEnabled = false; + } + + this.channelzTrace = new ChannelzTrace(); + if (this.channelzEnabled) { + this.channelzRef = registerChannelzChannel(target, () => this.getChannelzInfo()); + this.channelzTrace.addTrace('CT_INFO', 'Channel created'); + } else { + // Dummy channelz ref that will never be used + this.channelzRef = { + kind: 'channel', + id: -1, + name: '' + }; + } + + if (this.options['grpc.default_authority']) { + this.defaultAuthority = this.options['grpc.default_authority'] as string; + } else { + this.defaultAuthority = getDefaultAuthority(defaultSchemeMapResult); + } + const proxyMapResult = mapProxyName(defaultSchemeMapResult, options); + this.target = proxyMapResult.target; + this.options = Object.assign({}, this.options, proxyMapResult.extraOptions); + + /* The global boolean parameter to getSubchannelPool has the inverse meaning to what + * the grpc.use_local_subchannel_pool channel option means. */ + this.subchannelPool = getSubchannelPool( + (options['grpc.use_local_subchannel_pool'] ?? 0) === 0 + ); + const channelControlHelper: ChannelControlHelper = { + createSubchannel: ( + subchannelAddress: SubchannelAddress, + subchannelArgs: ChannelOptions + ) => { + const subchannel = this.subchannelPool.getOrCreateSubchannel( + this.target, + subchannelAddress, + Object.assign({}, this.options, subchannelArgs), + this.credentials + ); + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef()); + } + return subchannel; + }, + updateState: (connectivityState: ConnectivityState, picker: Picker) => { + this.currentPicker = picker; + const queueCopy = this.pickQueue.slice(); + this.pickQueue = []; + this.callRefTimerUnref(); + for (const call of queueCopy) { + call.doPick(); + } + this.updateState(connectivityState); + }, + requestReresolution: () => { + // This should never be called. + throw new Error( + 'Resolving load balancer should never call requestReresolution' + ); + }, + addChannelzChild: (child: ChannelRef | SubchannelRef) => { + if (this.channelzEnabled) { + this.childrenTracker.refChild(child); + } + }, + removeChannelzChild: (child: ChannelRef | SubchannelRef) => { + if (this.channelzEnabled) { + this.childrenTracker.unrefChild(child); + } + } + }; + this.resolvingLoadBalancer = new ResolvingLoadBalancer( + this.target, + channelControlHelper, + options, + (configSelector) => { + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Address resolution succeeded'); + } + this.configSelector = configSelector; + /* We process the queue asynchronously to ensure that the corresponding + * load balancer update has completed. */ + process.nextTick(() => { + const localQueue = this.configSelectionQueue; + this.configSelectionQueue = []; + this.callRefTimerUnref(); + for (const call of localQueue) { + call.getConfig(); + } + this.configSelectionQueue = []; + }); + }, + (status) => { + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_WARNING', 'Address resolution failed with code ' + status.code + ' and details "' + status.details + '"'); + } + if (this.configSelectionQueue.length > 0) { + this.trace('Name resolution failed with calls queued for config selection'); + } + const localQueue = this.configSelectionQueue; + this.configSelectionQueue = []; + this.callRefTimerUnref(); + for (const call of localQueue) { + call.reportResolverError(status); + } + } + ); + this.filterStackFactory = new FilterStackFactory([ + new MaxMessageSizeFilterFactory(this.options), + new CompressionFilterFactory(this, this.options), + ]); + this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2)); + const error = new Error(); + trace(LogVerbosity.DEBUG, 'channel_stacktrace', '(' + this.channelzRef.id + ') ' + 'Channel constructed \n' + error.stack?.substring(error.stack.indexOf('\n')+1)); + } + + private getChannelzInfo(): ChannelInfo { + return { + target: this.originalTarget, + state: this.connectivityState, + trace: this.channelzTrace, + callTracker: this.callTracker, + children: this.childrenTracker.getChildLists() + }; + } + + private trace(text: string, verbosityOverride?: LogVerbosity) { + trace(verbosityOverride ?? LogVerbosity.DEBUG, 'channel', '(' + this.channelzRef.id + ') ' + uriToString(this.target) + ' ' + text); + } + + private callRefTimerRef() { + // If the hasRef function does not exist, always run the code + if (!this.callRefTimer.hasRef?.()) { + this.trace( + 'callRefTimer.ref | configSelectionQueue.length=' + + this.configSelectionQueue.length + + ' pickQueue.length=' + + this.pickQueue.length + ); + this.callRefTimer.ref?.(); + } + } + + private callRefTimerUnref() { + // If the hasRef function does not exist, always run the code + if (!this.callRefTimer.hasRef || this.callRefTimer.hasRef()) { + this.trace( + 'callRefTimer.unref | configSelectionQueue.length=' + + this.configSelectionQueue.length + + ' pickQueue.length=' + + this.pickQueue.length + ); + this.callRefTimer.unref?.(); + } + } + + private removeConnectivityStateWatcher( + watcherObject: ConnectivityStateWatcher + ) { + const watcherIndex = this.connectivityStateWatchers.findIndex( + (value) => value === watcherObject + ); + if (watcherIndex >= 0) { + this.connectivityStateWatchers.splice(watcherIndex, 1); + } + } + + private updateState(newState: ConnectivityState): void { + trace( + LogVerbosity.DEBUG, + 'connectivity_state', + '(' + this.channelzRef.id + ') ' + + uriToString(this.target) + + ' ' + + ConnectivityState[this.connectivityState] + + ' -> ' + + ConnectivityState[newState] + ); + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', ConnectivityState[this.connectivityState] + ' -> ' + ConnectivityState[newState]); + } + this.connectivityState = newState; + const watchersCopy = this.connectivityStateWatchers.slice(); + for (const watcherObject of watchersCopy) { + if (newState !== watcherObject.currentState) { + if (watcherObject.timer) { + clearTimeout(watcherObject.timer); + } + this.removeConnectivityStateWatcher(watcherObject); + watcherObject.callback(); + } + } + } + + doPick(metadata: Metadata, extraPickInfo: {[key: string]: string}) { + return this.currentPicker.pick({metadata: metadata, extraPickInfo: extraPickInfo}); + } + + queueCallForPick(call: LoadBalancingCall) { + this.pickQueue.push(call); + this.callRefTimerRef(); + } + + getConfig(method: string, metadata: Metadata) { + this.resolvingLoadBalancer.exitIdle(); + return this.configSelector?.(method, metadata) ?? null; + } + + queueCallForConfig(call: ResolvingCall) { + this.configSelectionQueue.push(call); + this.callRefTimerRef(); + } + + createLoadBalancingCall( + callConfig: CallConfig, + method: string, + host: string, + credentials: CallCredentials, + deadline: Deadline + ): LoadBalancingCall { + const callNumber = getNextCallNumber(); + this.trace( + 'createLoadBalancingCall [' + + callNumber + + '] method="' + + method + + '"' + ); + return new LoadBalancingCall(this, callConfig, method, host, credentials, deadline, this.filterStackFactory, callNumber); + } + + createInnerCall( + callConfig: CallConfig, + method: string, + host: string, + credentials: CallCredentials, + deadline: Deadline + ): Call { + // Create a RetryingCall if retries are enabled + return this.createLoadBalancingCall(callConfig, method, host, credentials, deadline); + } + + createResolvingCall( + method: string, + deadline: Deadline, + host: string | null | undefined, + parentCall: ServerSurfaceCall | null, + propagateFlags: number | null | undefined + ): ResolvingCall { + const callNumber = getNextCallNumber(); + this.trace( + 'createResolvingCall [' + + callNumber + + '] method="' + + method + + '", deadline=' + + deadline + ); + const finalOptions: CallStreamOptions = { + deadline: deadline, + flags: propagateFlags ?? Propagate.DEFAULTS, + host: host ?? this.defaultAuthority, + parentCall: parentCall, + }; + + const call = new ResolvingCall(this, method, finalOptions, this.filterStackFactory, this.credentials._getCallCredentials(), getNextCallNumber()); + + if (this.channelzEnabled) { + this.callTracker.addCallStarted(); + call.addStatusWatcher(status => { + if (status.code === Status.OK) { + this.callTracker.addCallSucceeded(); + } else { + this.callTracker.addCallFailed(); + } + }); + } + return call; + + } + + close() { + this.resolvingLoadBalancer.destroy(); + this.updateState(ConnectivityState.SHUTDOWN); + clearInterval(this.callRefTimer); + if (this.channelzEnabled) { + unregisterChannelzRef(this.channelzRef); + } + + this.subchannelPool.unrefUnusedSubchannels(); + } + + getTarget() { + return uriToString(this.target); + } + + getConnectivityState(tryToConnect: boolean) { + const connectivityState = this.connectivityState; + if (tryToConnect) { + this.resolvingLoadBalancer.exitIdle(); + } + return connectivityState; + } + + watchConnectivityState( + currentState: ConnectivityState, + deadline: Date | number, + callback: (error?: Error) => void + ): void { + if (this.connectivityState === ConnectivityState.SHUTDOWN) { + throw new Error('Channel has been shut down'); + } + let timer = null; + if (deadline !== Infinity) { + const deadlineDate: Date = + deadline instanceof Date ? deadline : new Date(deadline); + const now = new Date(); + if (deadline === -Infinity || deadlineDate <= now) { + process.nextTick( + callback, + new Error('Deadline passed without connectivity state change') + ); + return; + } + timer = setTimeout(() => { + this.removeConnectivityStateWatcher(watcherObject); + callback( + new Error('Deadline passed without connectivity state change') + ); + }, deadlineDate.getTime() - now.getTime()); + } + const watcherObject = { + currentState, + callback, + timer, + }; + this.connectivityStateWatchers.push(watcherObject); + } + + /** + * Get the channelz reference object for this channel. The returned value is + * garbage if channelz is disabled for this channel. + * @returns + */ + getChannelzRef() { + return this.channelzRef; + } + + createCall( + method: string, + deadline: Deadline, + host: string | null | undefined, + parentCall: ServerSurfaceCall | null, + propagateFlags: number | null | undefined + ): Call { + if (typeof method !== 'string') { + throw new TypeError('Channel#createCall: method must be a string'); + } + if (!(typeof deadline === 'number' || deadline instanceof Date)) { + throw new TypeError( + 'Channel#createCall: deadline must be a number or Date' + ); + } + if (this.connectivityState === ConnectivityState.SHUTDOWN) { + throw new Error('Channel has been shut down'); + } + return this.createResolvingCall(method, deadline, host, parentCall, propagateFlags); + } +} diff --git a/packages/grpc-js/src/load-balancer-outlier-detection.ts b/packages/grpc-js/src/load-balancer-outlier-detection.ts index e69e2ef99..5af0e79e7 100644 --- a/packages/grpc-js/src/load-balancer-outlier-detection.ts +++ b/packages/grpc-js/src/load-balancer-outlier-detection.ts @@ -15,8 +15,7 @@ * */ -import { ChannelOptions, connectivityState, StatusObject } from "."; -import { Call } from "./call-stream"; +import { ChannelOptions } from "./channel-options"; import { ConnectivityState } from "./connectivity-state"; import { Status } from "./constants"; import { durationToMs, isDuration, msToDuration } from "./duration"; @@ -311,28 +310,6 @@ interface MapEntry { subchannelWrappers: OutlierDetectionSubchannelWrapper[]; } -class OutlierDetectionCounterFilter extends BaseFilter implements Filter { - constructor(private callCounter: CallCounter) { - super(); - } - receiveTrailers(status: StatusObject): StatusObject { - if (status.code === Status.OK) { - this.callCounter.addSuccess(); - } else { - this.callCounter.addFailure(); - } - return status; - } -} - -class OutlierDetectionCounterFilterFactory implements FilterFactory { - constructor(private callCounter: CallCounter) {} - createFilter(callStream: Call): OutlierDetectionCounterFilter { - return new OutlierDetectionCounterFilter(this.callCounter); - } - -} - class OutlierDetectionPicker implements Picker { constructor(private wrappedPicker: Picker) {} pick(pickArgs: PickArgs): PickResult { @@ -344,7 +321,14 @@ class OutlierDetectionPicker implements Picker { return { ...wrappedPick, subchannel: subchannelWrapper.getWrappedSubchannel(), - extraFilterFactories: [...wrappedPick.extraFilterFactories, new OutlierDetectionCounterFilterFactory(mapEntry.counter)] + onCallEnded: statusCode => { + if (statusCode === Status.OK) { + mapEntry.counter.addSuccess(); + } else { + mapEntry.counter.addFailure(); + } + wrappedPick.onCallEnded?.(statusCode); + } }; } else { return wrappedPick; diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 884af50b7..bb95aba13 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -84,8 +84,8 @@ class PickFirstPicker implements Picker { pickResultType: PickResultType.COMPLETE, subchannel: this.subchannel, status: null, - extraFilterFactories: [], onCallStarted: null, + onCallEnded: null }; } } diff --git a/packages/grpc-js/src/load-balancer-round-robin.ts b/packages/grpc-js/src/load-balancer-round-robin.ts index 8a4094a02..91c10b238 100644 --- a/packages/grpc-js/src/load-balancer-round-robin.ts +++ b/packages/grpc-js/src/load-balancer-round-robin.ts @@ -78,8 +78,8 @@ class RoundRobinPicker implements Picker { pickResultType: PickResultType.COMPLETE, subchannel: pickedSubchannel, status: null, - extraFilterFactories: [], onCallStarted: null, + onCallEnded: null }; } diff --git a/packages/grpc-js/src/load-balancing-call.ts b/packages/grpc-js/src/load-balancing-call.ts new file mode 100644 index 000000000..f791b86bc --- /dev/null +++ b/packages/grpc-js/src/load-balancing-call.ts @@ -0,0 +1,281 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { CallCredentials } from "./call-credentials"; +import { Call, InterceptingListener, MessageContext, StatusObject } from "./call-interface"; +import { SubchannelCall } from "./subchannel-call"; +import { ConnectivityState } from "./connectivity-state"; +import { LogVerbosity, Status } from "./constants"; +import { Deadline, getDeadlineTimeoutString } from "./deadline"; +import { FilterStack, FilterStackFactory } from "./filter-stack"; +import { InternalChannel } from "./internal-channel"; +import { Metadata } from "./metadata"; +import { PickResultType } from "./picker"; +import { CallConfig } from "./resolver"; +import { splitHostPort } from "./uri-parser"; +import * as logging from './logging'; + +const TRACER_NAME = 'load_balancing_call'; + +export type RpcProgress = 'NOT_STARTED' | 'DROP' | 'REFUSED' | 'PROCESSED'; + +export interface StatusObjectWithProgress extends StatusObject { + progress: RpcProgress; +} + +export class LoadBalancingCall implements Call { + private child: SubchannelCall | null = null; + private readPending = false; + private writeFilterPending = false; + private pendingMessage: {context: MessageContext, message: Buffer} | null = null; + private pendingHalfClose = false; + private ended = false; + private serviceUrl: string; + private filterStack: FilterStack; + private metadata: Metadata | null = null; + private listener: InterceptingListener | null = null; + private onCallEnded: ((statusCode: Status) => void) | null = null; + constructor( + private readonly channel: InternalChannel, + private readonly callConfig: CallConfig, + private readonly methodName: string, + private readonly host : string, + private readonly credentials: CallCredentials, + private readonly deadline: Deadline, + filterStackFactory: FilterStackFactory, + private readonly callNumber: number + ) { + this.filterStack = filterStackFactory.createFilter(); + + const splitPath: string[] = this.methodName.split('/'); + let serviceName = ''; + /* The standard path format is "/{serviceName}/{methodName}", so if we split + * by '/', the first item should be empty and the second should be the + * service name */ + if (splitPath.length >= 2) { + serviceName = splitPath[1]; + } + const hostname = splitHostPort(this.host)?.host ?? 'localhost'; + /* Currently, call credentials are only allowed on HTTPS connections, so we + * can assume that the scheme is "https" */ + this.serviceUrl = `https://${hostname}/${serviceName}`; + } + + private trace(text: string): void { + logging.trace( + LogVerbosity.DEBUG, + TRACER_NAME, + '[' + this.callNumber + '] ' + text + ); + } + + private outputStatus(status: StatusObject, progress: RpcProgress) { + if (!this.ended) { + this.ended = true; + this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"'); + const filteredStatus = this.filterStack.receiveTrailers(status); + const finalStatus = {...filteredStatus, progress}; + this.listener?.onReceiveStatus(finalStatus); + this.onCallEnded?.(finalStatus.code); + } + } + + doPick() { + if (this.ended) { + return; + } + if (!this.metadata) { + throw new Error('doPick called before start'); + } + const pickResult = this.channel.doPick(this.metadata, this.callConfig.pickInformation); + const subchannelString = pickResult.subchannel ? + '(' + pickResult.subchannel.getChannelzRef().id + ') ' + pickResult.subchannel.getAddress() : + '' + pickResult.subchannel; + this.trace( + 'Pick result: ' + + PickResultType[pickResult.pickResultType] + + ' subchannel: ' + + subchannelString + + ' status: ' + + pickResult.status?.code + + ' ' + + pickResult.status?.details + ); + switch (pickResult.pickResultType) { + case PickResultType.COMPLETE: + this.credentials.generateMetadata({service_url: this.serviceUrl}).then( + (credsMetadata) => { + const finalMetadata = this.metadata!.clone(); + finalMetadata.merge(credsMetadata); + if (finalMetadata.get('authorization').length > 1) { + this.outputStatus( + { + code: Status.INTERNAL, + details: '"authorization" metadata cannot have multiple values', + metadata: new Metadata() + }, + 'PROCESSED' + ); + } + if (pickResult.subchannel!.getConnectivityState() !== ConnectivityState.READY) { + this.trace( + 'Picked subchannel ' + + subchannelString + + ' has state ' + + ConnectivityState[pickResult.subchannel!.getConnectivityState()] + + ' after getting credentials metadata. Retrying pick' + ); + this.doPick(); + return; + } + + if (this.deadline !== Infinity) { + finalMetadata.set('grpc-timeout', getDeadlineTimeoutString(this.deadline)); + } + try { + this.child = pickResult.subchannel!.getRealSubchannel().createCall(finalMetadata, this.host, this.methodName, { + onReceiveMetadata: metadata => { + this.listener!.onReceiveMetadata(this.filterStack.receiveMetadata(metadata)); + }, + onReceiveMessage: message => { + this.filterStack.receiveMessage(message).then(filteredMesssage => { + this.listener!.onReceiveMessage(filteredMesssage); + }, (status: StatusObject) => { + this.cancelWithStatus(status.code, status.details); + }); + }, + onReceiveStatus: status => { + this.outputStatus(status, 'PROCESSED'); + } + }); + } catch (error) { + this.trace( + 'Failed to start call on picked subchannel ' + + subchannelString + + ' with error ' + + (error as Error).message + ); + this.outputStatus( + { + code: Status.INTERNAL, + details: 'Failed to start HTTP/2 stream with error ' + (error as Error).message, + metadata: new Metadata() + }, + 'NOT_STARTED' + ); + return; + } + this.callConfig.onCommitted?.(); + pickResult.onCallStarted?.(); + this.onCallEnded = pickResult.onCallEnded; + this.trace('Created child call [' + this.child.getCallNumber() + ']'); + if (this.readPending) { + this.child.startRead(); + } + if (this.pendingMessage) { + this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message); + } + if (this.pendingHalfClose && !this.writeFilterPending) { + this.child.halfClose(); + } + }, (error: Error & { code: number }) => { + // We assume the error code isn't 0 (Status.OK) + this.outputStatus( + { + code: typeof error.code === 'number' ? error.code : Status.UNKNOWN, + details: `Getting metadata from plugin failed with error: ${error.message}`, + metadata: new Metadata() + }, + 'PROCESSED' + ); + } + ); + break; + case PickResultType.DROP: + this.outputStatus(pickResult.status!, 'DROP'); + break; + case PickResultType.TRANSIENT_FAILURE: + if (this.metadata.getOptions().waitForReady) { + this.channel.queueCallForPick(this); + } else { + this.outputStatus(pickResult.status!, 'PROCESSED'); + } + break; + case PickResultType.QUEUE: + this.channel.queueCallForPick(this); + } + } + + cancelWithStatus(status: Status, details: string): void { + this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"'); + this.child?.cancelWithStatus(status, details); + this.outputStatus({code: status, details: details, metadata: new Metadata()}, 'PROCESSED'); + } + getPeer(): string { + return this.child?.getPeer() ?? this.channel.getTarget(); + } + start(metadata: Metadata, listener: InterceptingListener): void { + this.trace('start called'); + this.listener = listener; + this.filterStack.sendMetadata(Promise.resolve(metadata)).then(filteredMetadata => { + this.metadata = filteredMetadata; + this.doPick(); + }, (status: StatusObject) => { + this.outputStatus(status, 'PROCESSED'); + }); + } + sendMessageWithContext(context: MessageContext, message: Buffer): void { + this.trace('write() called with message of length ' + message.length); + this.writeFilterPending = true; + this.filterStack.sendMessage(Promise.resolve({message: message, flags: context.flags})).then((filteredMessage) => { + this.writeFilterPending = false; + if (this.child) { + this.child.sendMessageWithContext(context, filteredMessage.message); + if (this.pendingHalfClose) { + this.child.halfClose(); + } + } else { + this.pendingMessage = {context, message: filteredMessage.message}; + } + }, (status: StatusObject) => { + this.cancelWithStatus(status.code, status.details); + }) + } + startRead(): void { + this.trace('startRead called'); + if (this.child) { + this.child.startRead(); + } else { + this.readPending = true; + } + } + halfClose(): void { + this.trace('halfClose called'); + if (this.child && !this.writeFilterPending) { + this.child.halfClose(); + } else { + this.pendingHalfClose = true; + } + } + setCredentials(credentials: CallCredentials): void { + throw new Error("Method not implemented."); + } + + getCallNumber(): number { + return this.callNumber; + } +} \ No newline at end of file diff --git a/packages/grpc-js/src/max-message-size-filter.ts b/packages/grpc-js/src/max-message-size-filter.ts index 9a3bc9c0a..62d01077c 100644 --- a/packages/grpc-js/src/max-message-size-filter.ts +++ b/packages/grpc-js/src/max-message-size-filter.ts @@ -16,20 +16,20 @@ */ import { BaseFilter, Filter, FilterFactory } from './filter'; -import { Call, WriteObject } from './call-stream'; +import { WriteObject } from './call-interface'; import { Status, DEFAULT_MAX_SEND_MESSAGE_LENGTH, DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, } from './constants'; import { ChannelOptions } from './channel-options'; +import { Metadata } from './metadata'; export class MaxMessageSizeFilter extends BaseFilter implements Filter { private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH; private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH; constructor( - private readonly options: ChannelOptions, - private readonly callStream: Call + private readonly options: ChannelOptions ) { super(); if ('grpc.max_send_message_length' in options) { @@ -48,11 +48,11 @@ export class MaxMessageSizeFilter extends BaseFilter implements Filter { } else { const concreteMessage = await message; if (concreteMessage.message.length > this.maxSendMessageSize) { - this.callStream.cancelWithStatus( - Status.RESOURCE_EXHAUSTED, - `Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})` - ); - return Promise.reject('Message too large'); + throw { + code: Status.RESOURCE_EXHAUSTED, + details: `Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})`, + metadata: new Metadata() + }; } else { return concreteMessage; } @@ -67,11 +67,11 @@ export class MaxMessageSizeFilter extends BaseFilter implements Filter { } else { const concreteMessage = await message; if (concreteMessage.length > this.maxReceiveMessageSize) { - this.callStream.cancelWithStatus( - Status.RESOURCE_EXHAUSTED, - `Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})` - ); - return Promise.reject('Message too large'); + throw { + code: Status.RESOURCE_EXHAUSTED, + details: `Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})`, + metadata: new Metadata() + }; } else { return concreteMessage; } @@ -83,7 +83,7 @@ export class MaxMessageSizeFilterFactory implements FilterFactory { constructor(private readonly options: ChannelOptions) {} - createFilter(callStream: Call): MaxMessageSizeFilter { - return new MaxMessageSizeFilter(this.options, callStream); + createFilter(): MaxMessageSizeFilter { + return new MaxMessageSizeFilter(this.options); } } diff --git a/packages/grpc-js/src/picker.ts b/packages/grpc-js/src/picker.ts index f366a6919..162596ef5 100644 --- a/packages/grpc-js/src/picker.ts +++ b/packages/grpc-js/src/picker.ts @@ -15,12 +15,10 @@ * */ -import { Subchannel } from './subchannel'; -import { StatusObject } from './call-stream'; +import { StatusObject } from './call-interface'; import { Metadata } from './metadata'; import { Status } from './constants'; import { LoadBalancer } from './load-balancer'; -import { FilterFactory, Filter } from './filter'; import { SubchannelInterface } from './subchannel-interface'; export enum PickResultType { @@ -43,45 +41,40 @@ export interface PickResult { * `pickResultType` is TRANSIENT_FAILURE. */ status: StatusObject | null; - /** - * Extra FilterFactory (can be multiple encapsulated in a FilterStackFactory) - * provided by the load balancer to be used with the call. For technical - * reasons filters from this factory will not see sendMetadata events. - */ - extraFilterFactories: FilterFactory[]; onCallStarted: (() => void) | null; + onCallEnded: ((statusCode: Status) => void) | null; } export interface CompletePickResult extends PickResult { pickResultType: PickResultType.COMPLETE; subchannel: SubchannelInterface | null; status: null; - extraFilterFactories: FilterFactory[]; onCallStarted: (() => void) | null; + onCallEnded: ((statusCode: Status) => void) | null; } export interface QueuePickResult extends PickResult { pickResultType: PickResultType.QUEUE; subchannel: null; status: null; - extraFilterFactories: []; onCallStarted: null; + onCallEnded: null; } export interface TransientFailurePickResult extends PickResult { pickResultType: PickResultType.TRANSIENT_FAILURE; subchannel: null; status: StatusObject; - extraFilterFactories: []; onCallStarted: null; + onCallEnded: null; } export interface DropCallPickResult extends PickResult { pickResultType: PickResultType.DROP; subchannel: null; status: StatusObject; - extraFilterFactories: []; onCallStarted: null; + onCallEnded: null; } export interface PickArgs { @@ -120,8 +113,8 @@ export class UnavailablePicker implements Picker { pickResultType: PickResultType.TRANSIENT_FAILURE, subchannel: null, status: this.status, - extraFilterFactories: [], onCallStarted: null, + onCallEnded: null }; } } @@ -149,8 +142,8 @@ export class QueuePicker { pickResultType: PickResultType.QUEUE, subchannel: null, status: null, - extraFilterFactories: [], onCallStarted: null, + onCallEnded: null }; } } diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index c4cb64a74..7c4028b06 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -24,7 +24,7 @@ import * as dns from 'dns'; import * as util from 'util'; import { extractAndSelectServiceConfig, ServiceConfig } from './service-config'; import { Status } from './constants'; -import { StatusObject } from './call-stream'; +import { StatusObject } from './call-interface'; import { Metadata } from './metadata'; import * as logging from './logging'; import { LogVerbosity } from './constants'; diff --git a/packages/grpc-js/src/resolver-ip.ts b/packages/grpc-js/src/resolver-ip.ts index 24efc3fdc..efb0b8dcb 100644 --- a/packages/grpc-js/src/resolver-ip.ts +++ b/packages/grpc-js/src/resolver-ip.ts @@ -15,7 +15,7 @@ */ import { isIPv4, isIPv6 } from 'net'; -import { StatusObject } from './call-stream'; +import { StatusObject } from './call-interface'; import { ChannelOptions } from './channel-options'; import { LogVerbosity, Status } from './constants'; import { Metadata } from './metadata'; diff --git a/packages/grpc-js/src/resolver.ts b/packages/grpc-js/src/resolver.ts index fcbc6894a..770004487 100644 --- a/packages/grpc-js/src/resolver.ts +++ b/packages/grpc-js/src/resolver.ts @@ -16,7 +16,7 @@ */ import { MethodConfig, ServiceConfig } from './service-config'; -import { StatusObject } from './call-stream'; +import { StatusObject } from './call-interface'; import { SubchannelAddress } from './subchannel-address'; import { GrpcUri, uriToString } from './uri-parser'; import { ChannelOptions } from './channel-options'; diff --git a/packages/grpc-js/src/resolving-call.ts b/packages/grpc-js/src/resolving-call.ts new file mode 100644 index 000000000..4cf24b4b1 --- /dev/null +++ b/packages/grpc-js/src/resolving-call.ts @@ -0,0 +1,219 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { CallCredentials } from "./call-credentials"; +import { Call, CallStreamOptions, InterceptingListener, MessageContext, StatusObject } from "./call-interface"; +import { LogVerbosity, Propagate, Status } from "./constants"; +import { Deadline, getDeadlineTimeoutString, getRelativeTimeout, minDeadline } from "./deadline"; +import { FilterStackFactory } from "./filter-stack"; +import { InternalChannel } from "./internal-channel"; +import { Metadata } from "./metadata"; +import * as logging from './logging'; + +const TRACER_NAME = 'resolving_call'; + +export class ResolvingCall implements Call { + private child: Call | null = null; + private readPending = false; + private pendingMessage: {context: MessageContext, message: Buffer} | null = null; + private pendingHalfClose = false; + private ended = false; + private metadata: Metadata | null = null; + private listener: InterceptingListener | null = null; + private deadline: Deadline; + private host: string; + private statusWatchers: ((status: StatusObject) => void)[] = []; + private deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0); + + constructor( + private readonly channel: InternalChannel, + private readonly method: string, + options: CallStreamOptions, + private readonly filterStackFactory: FilterStackFactory, + private credentials: CallCredentials, + private callNumber: number + ) { + this.deadline = options.deadline; + this.host = options.host; + if (options.parentCall) { + if (options.flags & Propagate.CANCELLATION) { + options.parentCall.on('cancelled', () => { + this.cancelWithStatus(Status.CANCELLED, 'Cancelled by parent call'); + }); + } + if (options.flags & Propagate.DEADLINE) { + this.trace('Propagating deadline from parent: ' + options.parentCall.getDeadline()); + this.deadline = minDeadline(this.deadline, options.parentCall.getDeadline()); + } + } + this.trace('Created'); + this.runDeadlineTimer(); + } + + private trace(text: string): void { + logging.trace( + LogVerbosity.DEBUG, + TRACER_NAME, + '[' + this.callNumber + '] ' + text + ); + } + + private runDeadlineTimer() { + clearTimeout(this.deadlineTimer); + this.trace('Deadline: ' + this.deadline); + if (this.deadline !== Infinity) { + const timeout = getRelativeTimeout(this.deadline); + this.trace('Deadline will be reached in ' + timeout + 'ms'); + const handleDeadline = () => { + this.cancelWithStatus( + Status.DEADLINE_EXCEEDED, + 'Deadline exceeded' + ); + } + if (timeout <= 0) { + process.nextTick(handleDeadline); + } else { + this.deadlineTimer = setTimeout(handleDeadline, timeout); + } + } + } + + private outputStatus(status: StatusObject) { + if (!this.ended) { + this.ended = true; + this.trace('ended with status: code=' + status.code + ' details="' + status.details + '"'); + this.statusWatchers.forEach(watcher => watcher(status)); + process.nextTick(() => { + this.listener?.onReceiveStatus(status); + }); + } + } + + getConfig(): void { + if (this.ended) { + return; + } + if (!this.metadata || !this.listener) { + throw new Error('getConfig called before start'); + } + const config = this.channel.getConfig(this.method, this.metadata); + if (!config) { + this.channel.queueCallForConfig(this); + return; + } + if (config.status !== Status.OK) { + this.outputStatus({ + code: config.status, + details: 'Failed to route call to ' + this.method, + metadata: new Metadata() + }); + return; + } + + if (config.methodConfig.timeout) { + const configDeadline = new Date(); + configDeadline.setSeconds( + configDeadline.getSeconds() + config.methodConfig.timeout.seconds + ); + configDeadline.setMilliseconds( + configDeadline.getMilliseconds() + + config.methodConfig.timeout.nanos / 1_000_000 + ); + this.deadline = minDeadline(this.deadline, configDeadline); + } + + this.filterStackFactory.push(config.dynamicFilterFactories); + + this.child = this.channel.createInnerCall(config, this.method, this.host, this.credentials, this.deadline); + this.child.start(this.metadata, { + onReceiveMetadata: metadata => { + this.listener!.onReceiveMetadata(metadata); + }, + onReceiveMessage: message => { + this.listener!.onReceiveMessage(message); + }, + onReceiveStatus: status => { + this.outputStatus(status); + } + }); + if (this.readPending) { + this.child.startRead(); + } + if (this.pendingMessage) { + this.child.sendMessageWithContext(this.pendingMessage.context, this.pendingMessage.message); + } + if (this.pendingHalfClose) { + this.child.halfClose(); + } + } + reportResolverError(status: StatusObject) { + if (this.metadata?.getOptions().waitForReady) { + this.channel.queueCallForConfig(this); + } else { + this.outputStatus(status); + } + } + cancelWithStatus(status: Status, details: string): void { + this.trace('cancelWithStatus code: ' + status + ' details: "' + details + '"'); + this.child?.cancelWithStatus(status, details); + this.outputStatus({code: status, details: details, metadata: new Metadata()}); + } + getPeer(): string { + return this.child?.getPeer() ?? this.channel.getTarget(); + } + start(metadata: Metadata, listener: InterceptingListener): void { + this.trace('start called'); + this.metadata = metadata.clone(); + this.listener = listener; + this.getConfig(); + } + sendMessageWithContext(context: MessageContext, message: Buffer): void { + this.trace('write() called with message of length ' + message.length); + if (this.child) { + this.child.sendMessageWithContext(context, message); + } else { + this.pendingMessage = {context, message}; + } + } + startRead(): void { + this.trace('startRead called'); + if (this.child) { + this.child.startRead(); + } else { + this.readPending = true; + } + } + halfClose(): void { + this.trace('halfClose called'); + if (this.child) { + this.child.halfClose(); + } else { + this.pendingHalfClose = true; + } + } + setCredentials(credentials: CallCredentials): void { + this.credentials = this.credentials.compose(credentials); + } + + addStatusWatcher(watcher: (status: StatusObject) => void) { + this.statusWatchers.push(watcher); + } + + getCallNumber(): number { + return this.callNumber; + } +} \ No newline at end of file diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 907067dfc..47f7c3bb1 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -28,7 +28,7 @@ import { ServiceError } from './call'; import { Picker, UnavailablePicker, QueuePicker } from './picker'; import { BackoffOptions, BackoffTimeout } from './backoff-timeout'; import { Status } from './constants'; -import { StatusObject } from './call-stream'; +import { StatusObject } from './call-interface'; import { Metadata } from './metadata'; import * as logging from './logging'; import { LogVerbosity } from './constants'; diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 315c9d9aa..388a8d339 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -20,7 +20,6 @@ import * as http2 from 'http2'; import { Duplex, Readable, Writable } from 'stream'; import * as zlib from 'zlib'; -import { Deadline, StatusObject } from './call-stream'; import { Status, DEFAULT_MAX_SEND_MESSAGE_LENGTH, @@ -33,6 +32,8 @@ import { StreamDecoder } from './stream-decoder'; import { ObjectReadable, ObjectWritable } from './object-stream'; import { ChannelOptions } from './channel-options'; import * as logging from './logging'; +import { StatusObject } from './call-interface'; +import { Deadline } from './deadline'; const TRACER_NAME = 'server_call'; @@ -508,6 +509,8 @@ export class Http2ServerCallStream< receiveMetadata(headers: http2.IncomingHttpHeaders) { const metadata = Metadata.fromHttp2Headers(headers); + trace('Request to ' + this.handler.path + ' received headers ' + JSON.stringify(metadata.toJSON())); + // TODO(cjihrig): Receive compression metadata. const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER); diff --git a/packages/grpc-js/src/service-config.ts b/packages/grpc-js/src/service-config.ts index f310597e9..12802dad8 100644 --- a/packages/grpc-js/src/service-config.ts +++ b/packages/grpc-js/src/service-config.ts @@ -27,6 +27,7 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import * as os from 'os'; +import { Status } from './constants'; import { Duration } from './duration'; import { LoadBalancingConfig, @@ -38,18 +39,40 @@ export interface MethodConfigName { method?: string; } +export interface RetryPolicy { + maxAttempts: number; + initialBackoff: string; + maxBackoff: string; + backoffMultiplier: number; + retryableStatusCodes: (Status | string)[]; +} + +export interface HedgingPolicy { + maxAttempts: number; + hedgingDelay?: string; + nonFatalStatusCodes: (Status | string)[]; +} + export interface MethodConfig { name: MethodConfigName[]; waitForReady?: boolean; timeout?: Duration; maxRequestBytes?: number; maxResponseBytes?: number; + retryPolicy?: RetryPolicy; + hedgingPolicy?: HedgingPolicy; +} + +export interface RetryThrottling { + maxTokens: number; + tokenRatio: number; } export interface ServiceConfig { loadBalancingPolicy?: string; loadBalancingConfig: LoadBalancingConfig[]; methodConfig: MethodConfig[]; + retryThrottling?: RetryThrottling; } export interface ServiceConfigCanaryConfig { diff --git a/packages/grpc-js/src/status-builder.ts b/packages/grpc-js/src/status-builder.ts index 1109af1ac..78e2ea310 100644 --- a/packages/grpc-js/src/status-builder.ts +++ b/packages/grpc-js/src/status-builder.ts @@ -15,7 +15,7 @@ * */ -import { StatusObject } from './call-stream'; +import { StatusObject } from './call-interface'; import { Status } from './constants'; import { Metadata } from './metadata'; diff --git a/packages/grpc-js/src/subchannel-call.ts b/packages/grpc-js/src/subchannel-call.ts new file mode 100644 index 000000000..ac34823ba --- /dev/null +++ b/packages/grpc-js/src/subchannel-call.ts @@ -0,0 +1,504 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as http2 from 'http2'; +import * as os from 'os'; + +import { Status } from './constants'; +import { Metadata } from './metadata'; +import { StreamDecoder } from './stream-decoder'; +import { SubchannelCallStatsTracker, Subchannel } from './subchannel'; +import * as logging from './logging'; +import { LogVerbosity } from './constants'; +import { ServerSurfaceCall } from './server-call'; +import { Deadline } from './deadline'; +import { InterceptingListener, MessageContext, StatusObject, WriteCallback } from './call-interface'; + +const TRACER_NAME = 'subchannel_call'; + +const { + HTTP2_HEADER_STATUS, + HTTP2_HEADER_CONTENT_TYPE, + NGHTTP2_CANCEL, +} = http2.constants; + +/** + * https://nodejs.org/api/errors.html#errors_class_systemerror + */ +interface SystemError extends Error { + address?: string; + code: string; + dest?: string; + errno: number; + info?: object; + message: string; + path?: string; + port?: number; + syscall: string; +} + +/** + * Should do approximately the same thing as util.getSystemErrorName but the + * TypeScript types don't have that function for some reason so I just made my + * own. + * @param errno + */ +function getSystemErrorName(errno: number): string { + for (const [name, num] of Object.entries(os.constants.errno)) { + if (num === errno) { + return name; + } + } + return 'Unknown system error ' + errno; +} + +export interface SubchannelCall { + cancelWithStatus(status: Status, details: string): void; + getPeer(): string; + sendMessageWithContext(context: MessageContext, message: Buffer): void; + startRead(): void; + halfClose(): void; + getCallNumber(): number; +} + +export class Http2SubchannelCall implements SubchannelCall { + private decoder = new StreamDecoder(); + + private isReadFilterPending = false; + private canPush = false; + /** + * Indicates that an 'end' event has come from the http2 stream, so there + * will be no more data events. + */ + private readsClosed = false; + + private statusOutput = false; + + private unpushedReadMessages: Buffer[] = []; + + // Status code mapped from :status. To be used if grpc-status is not received + private mappedStatusCode: Status = Status.UNKNOWN; + + // This is populated (non-null) if and only if the call has ended + private finalStatus: StatusObject | null = null; + + private disconnectListener: () => void; + + private internalError: SystemError | null = null; + + constructor( + private readonly http2Stream: http2.ClientHttp2Stream, + private readonly callStatsTracker: SubchannelCallStatsTracker, + private readonly listener: InterceptingListener, + private readonly subchannel: Subchannel, + private readonly callId: number + ) { + this.disconnectListener = () => { + this.endCall({ + code: Status.UNAVAILABLE, + details: 'Connection dropped', + metadata: new Metadata(), + }); + }; + subchannel.addDisconnectListener(this.disconnectListener); + subchannel.callRef(); + http2Stream.on('response', (headers, flags) => { + let headersString = ''; + for (const header of Object.keys(headers)) { + headersString += '\t\t' + header + ': ' + headers[header] + '\n'; + } + this.trace('Received server headers:\n' + headersString); + switch (headers[':status']) { + // TODO(murgatroid99): handle 100 and 101 + case 400: + this.mappedStatusCode = Status.INTERNAL; + break; + case 401: + this.mappedStatusCode = Status.UNAUTHENTICATED; + break; + case 403: + this.mappedStatusCode = Status.PERMISSION_DENIED; + break; + case 404: + this.mappedStatusCode = Status.UNIMPLEMENTED; + break; + case 429: + case 502: + case 503: + case 504: + this.mappedStatusCode = Status.UNAVAILABLE; + break; + default: + this.mappedStatusCode = Status.UNKNOWN; + } + + if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) { + this.handleTrailers(headers); + } else { + let metadata: Metadata; + try { + metadata = Metadata.fromHttp2Headers(headers); + } catch (error) { + this.endCall({ + code: Status.UNKNOWN, + details: (error as Error).message, + metadata: new Metadata(), + }); + return; + } + this.listener.onReceiveMetadata(metadata); + } + }); + http2Stream.on('trailers', (headers: http2.IncomingHttpHeaders) => { + this.handleTrailers(headers); + }); + http2Stream.on('data', (data: Buffer) => { + this.trace('receive HTTP/2 data frame of length ' + data.length); + const messages = this.decoder.write(data); + + for (const message of messages) { + this.trace('parsed message of length ' + message.length); + this.callStatsTracker!.addMessageReceived(); + this.tryPush(message); + } + }); + http2Stream.on('end', () => { + this.readsClosed = true; + this.maybeOutputStatus(); + }); + http2Stream.on('close', () => { + /* Use process.next tick to ensure that this code happens after any + * "error" event that may be emitted at about the same time, so that + * we can bubble up the error message from that event. */ + process.nextTick(() => { + this.trace('HTTP/2 stream closed with code ' + http2Stream.rstCode); + /* If we have a final status with an OK status code, that means that + * we have received all of the messages and we have processed the + * trailers and the call completed successfully, so it doesn't matter + * how the stream ends after that */ + if (this.finalStatus?.code === Status.OK) { + return; + } + let code: Status; + let details = ''; + switch (http2Stream.rstCode) { + case http2.constants.NGHTTP2_NO_ERROR: + /* If we get a NO_ERROR code and we already have a status, the + * stream completed properly and we just haven't fully processed + * it yet */ + if (this.finalStatus !== null) { + return; + } + code = Status.INTERNAL; + details = `Received RST_STREAM with code ${http2Stream.rstCode}`; + break; + case http2.constants.NGHTTP2_REFUSED_STREAM: + code = Status.UNAVAILABLE; + details = 'Stream refused by server'; + break; + case http2.constants.NGHTTP2_CANCEL: + code = Status.CANCELLED; + details = 'Call cancelled'; + break; + case http2.constants.NGHTTP2_ENHANCE_YOUR_CALM: + code = Status.RESOURCE_EXHAUSTED; + details = 'Bandwidth exhausted or memory limit exceeded'; + break; + case http2.constants.NGHTTP2_INADEQUATE_SECURITY: + code = Status.PERMISSION_DENIED; + details = 'Protocol not secure enough'; + break; + case http2.constants.NGHTTP2_INTERNAL_ERROR: + code = Status.INTERNAL; + if (this.internalError === null) { + /* This error code was previously handled in the default case, and + * there are several instances of it online, so I wanted to + * preserve the original error message so that people find existing + * information in searches, but also include the more recognizable + * "Internal server error" message. */ + details = `Received RST_STREAM with code ${http2Stream.rstCode} (Internal server error)`; + } else { + if (this.internalError.code === 'ECONNRESET' || this.internalError.code === 'ETIMEDOUT') { + code = Status.UNAVAILABLE; + details = this.internalError.message; + } else { + /* The "Received RST_STREAM with code ..." error is preserved + * here for continuity with errors reported online, but the + * error message at the end will probably be more relevant in + * most cases. */ + details = `Received RST_STREAM with code ${http2Stream.rstCode} triggered by internal client error: ${this.internalError.message}`; + } + } + break; + default: + code = Status.INTERNAL; + details = `Received RST_STREAM with code ${http2Stream.rstCode}`; + } + // This is a no-op if trailers were received at all. + // This is OK, because status codes emitted here correspond to more + // catastrophic issues that prevent us from receiving trailers in the + // first place. + this.endCall({ code, details, metadata: new Metadata() }); + }); + }); + http2Stream.on('error', (err: SystemError) => { + /* We need an error handler here to stop "Uncaught Error" exceptions + * from bubbling up. However, errors here should all correspond to + * "close" events, where we will handle the error more granularly */ + /* Specifically looking for stream errors that were *not* constructed + * from a RST_STREAM response here: + * https://github.com/nodejs/node/blob/8b8620d580314050175983402dfddf2674e8e22a/lib/internal/http2/core.js#L2267 + */ + if (err.code !== 'ERR_HTTP2_STREAM_ERROR') { + this.trace( + 'Node error event: message=' + + err.message + + ' code=' + + err.code + + ' errno=' + + getSystemErrorName(err.errno) + + ' syscall=' + + err.syscall + ); + this.internalError = err; + } + this.callStatsTracker.onStreamEnd(false); + }); + } + + private outputStatus() { + /* Precondition: this.finalStatus !== null */ + if (!this.statusOutput) { + this.statusOutput = true; + this.trace( + 'ended with status: code=' + + this.finalStatus!.code + + ' details="' + + this.finalStatus!.details + + '"' + ); + this.callStatsTracker.onCallEnd(this.finalStatus!); + /* We delay the actual action of bubbling up the status to insulate the + * cleanup code in this class from any errors that may be thrown in the + * upper layers as a result of bubbling up the status. In particular, + * if the status is not OK, the "error" event may be emitted + * synchronously at the top level, which will result in a thrown error if + * the user does not handle that event. */ + process.nextTick(() => { + this.listener.onReceiveStatus(this.finalStatus!); + }); + this.subchannel.callUnref(); + this.subchannel.removeDisconnectListener(this.disconnectListener); + } + } + + private trace(text: string): void { + logging.trace( + LogVerbosity.DEBUG, + TRACER_NAME, + '[' + this.callId + '] ' + text + ); + } + + /** + * On first call, emits a 'status' event with the given StatusObject. + * Subsequent calls are no-ops. + * @param status The status of the call. + */ + private endCall(status: StatusObject): void { + /* If the status is OK and a new status comes in (e.g. from a + * deserialization failure), that new status takes priority */ + if (this.finalStatus === null || this.finalStatus.code === Status.OK) { + this.finalStatus = status; + this.maybeOutputStatus(); + } + this.destroyHttp2Stream(); + } + + private maybeOutputStatus() { + if (this.finalStatus !== null) { + /* The combination check of readsClosed and that the two message buffer + * arrays are empty checks that there all incoming data has been fully + * processed */ + if ( + this.finalStatus.code !== Status.OK || + (this.readsClosed && + this.unpushedReadMessages.length === 0 && + !this.isReadFilterPending) + ) { + this.outputStatus(); + } + } + } + + private push(message: Buffer): void { + this.trace( + 'pushing to reader message of length ' + + (message instanceof Buffer ? message.length : null) + ); + this.canPush = false; + process.nextTick(() => { + /* If we have already output the status any later messages should be + * ignored, and can cause out-of-order operation errors higher up in the + * stack. Checking as late as possible here to avoid any race conditions. + */ + if (this.statusOutput) { + return; + } + this.listener.onReceiveMessage(message); + this.maybeOutputStatus(); + }); + } + + private tryPush(messageBytes: Buffer): void { + if (this.canPush) { + this.http2Stream!.pause(); + this.push(messageBytes); + } else { + this.trace( + 'unpushedReadMessages.push message of length ' + messageBytes.length + ); + this.unpushedReadMessages.push(messageBytes); + } + } + + private handleTrailers(headers: http2.IncomingHttpHeaders) { + this.callStatsTracker.onStreamEnd(true); + let headersString = ''; + for (const header of Object.keys(headers)) { + headersString += '\t\t' + header + ': ' + headers[header] + '\n'; + } + this.trace('Received server trailers:\n' + headersString); + let metadata: Metadata; + try { + metadata = Metadata.fromHttp2Headers(headers); + } catch (e) { + metadata = new Metadata(); + } + const metadataMap = metadata.getMap(); + let code: Status = this.mappedStatusCode; + if ( + code === Status.UNKNOWN && + typeof metadataMap['grpc-status'] === 'string' + ) { + const receivedStatus = Number(metadataMap['grpc-status']); + if (receivedStatus in Status) { + code = receivedStatus; + this.trace('received status code ' + receivedStatus + ' from server'); + } + metadata.remove('grpc-status'); + } + let details = ''; + if (typeof metadataMap['grpc-message'] === 'string') { + details = decodeURI(metadataMap['grpc-message']); + metadata.remove('grpc-message'); + this.trace( + 'received status details string "' + details + '" from server' + ); + } + const status: StatusObject = { code, details, metadata }; + // This is a no-op if the call was already ended when handling headers. + this.endCall(status); + } + + private destroyHttp2Stream() { + // The http2 stream could already have been destroyed if cancelWithStatus + // is called in response to an internal http2 error. + if (!this.http2Stream.destroyed) { + /* If the call has ended with an OK status, communicate that when closing + * the stream, partly to avoid a situation in which we detect an error + * RST_STREAM as a result after we have the status */ + let code: number; + if (this.finalStatus?.code === Status.OK) { + code = http2.constants.NGHTTP2_NO_ERROR; + } else { + code = http2.constants.NGHTTP2_CANCEL; + } + this.trace('close http2 stream with code ' + code); + this.http2Stream.close(code); + } + } + + cancelWithStatus(status: Status, details: string): void { + this.trace( + 'cancelWithStatus code: ' + status + ' details: "' + details + '"' + ); + this.endCall({ code: status, details, metadata: new Metadata() }); + } + + getStatus(): StatusObject | null { + return this.finalStatus; + } + + getPeer(): string { + return this.subchannel.getAddress(); + } + + getCallNumber(): number { + return this.callId; + } + + startRead() { + /* If the stream has ended with an error, we should not emit any more + * messages and we should communicate that the stream has ended */ + if (this.finalStatus !== null && this.finalStatus.code !== Status.OK) { + this.readsClosed = true; + this.maybeOutputStatus(); + return; + } + this.canPush = true; + if (this.unpushedReadMessages.length > 0) { + const nextMessage: Buffer = this.unpushedReadMessages.shift()!; + this.push(nextMessage); + return; + } + /* Only resume reading from the http2Stream if we don't have any pending + * messages to emit */ + this.http2Stream.resume(); + } + + sendMessageWithContext(context: MessageContext, message: Buffer) { + this.trace('write() called with message of length ' + message.length); + const cb: WriteCallback = (error?: Error | null) => { + let code: Status = Status.UNAVAILABLE; + if ((error as NodeJS.ErrnoException)?.code === 'ERR_STREAM_WRITE_AFTER_END') { + code = Status.INTERNAL; + } + if (error) { + this.cancelWithStatus(code, `Write error: ${error.message}`); + } + context.callback?.(); + }; + this.trace('sending data chunk of length ' + message.length); + this.callStatsTracker.addMessageSent(); + try { + this.http2Stream!.write(message, cb); + } catch (error) { + this.endCall({ + code: Status.UNAVAILABLE, + details: `Write failed with error ${(error as Error).message}`, + metadata: new Metadata() + }); + } + } + + halfClose() { + this.trace('end() called'); + this.trace('calling end() on HTTP/2 stream'); + this.http2Stream.end(); + } +} diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 800274f99..563e69464 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -18,7 +18,6 @@ import * as http2 from 'http2'; import { ChannelCredentials } from './channel-credentials'; import { Metadata } from './metadata'; -import { Call, Http2CallStream, WriteObject } from './call-stream'; import { ChannelOptions } from './channel-options'; import { PeerCertificate, checkServerIdentity, TLSSocket, CipherNameAndProtocol } from 'tls'; import { ConnectivityState } from './connectivity-state'; @@ -30,7 +29,6 @@ import { getProxiedConnection, ProxyConnectionResult } from './http_proxy'; import * as net from 'net'; import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser'; import { ConnectionOptions } from 'tls'; -import { FilterFactory, Filter, BaseFilter } from './filter'; import { stringToSubchannelAddress, SubchannelAddress, @@ -38,18 +36,16 @@ import { } from './subchannel-address'; import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz'; import { ConnectivityStateListener } from './subchannel-interface'; +import { Http2SubchannelCall } from './subchannel-call'; +import { getNextCallNumber } from './call-number'; +import { SubchannelCall } from './subchannel-call'; +import { InterceptingListener, StatusObject } from './call-interface'; const clientVersion = require('../../package.json').version; const TRACER_NAME = 'subchannel'; const FLOW_CONTROL_TRACER_NAME = 'subchannel_flowctrl'; -const MIN_CONNECT_TIMEOUT_MS = 20000; -const INITIAL_BACKOFF_MS = 1000; -const BACKOFF_MULTIPLIER = 1.6; -const MAX_BACKOFF_MS = 120000; -const BACKOFF_JITTER = 0.2; - /* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't * have a constant for the max signed 32 bit integer, so this is a simple way * to calculate it */ @@ -59,6 +55,8 @@ const KEEPALIVE_TIMEOUT_MS = 20000; export interface SubchannelCallStatsTracker { addMessageSent(): void; addMessageReceived(): void; + onCallEnd(status: StatusObject): void; + onStreamEnd(success: boolean): void; } const { @@ -70,15 +68,6 @@ const { HTTP2_HEADER_USER_AGENT, } = http2.constants; -/** - * Get a number uniformly at random in the range [min, max) - * @param min - * @param max - */ -function uniformRandom(min: number, max: number) { - return Math.random() * (max - min) + min; -} - const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii'); export class Subchannel { @@ -808,24 +797,13 @@ export class Subchannel { return false; } - /** - * Start a stream on the current session with the given `metadata` as headers - * and then attach it to the `callStream`. Must only be called if the - * subchannel's current connectivity state is READY. - * @param metadata - * @param callStream - */ - startCallStream( - metadata: Metadata, - callStream: Http2CallStream, - extraFilters: Filter[] - ) { + createCall(metadata: Metadata, host: string, method: string, listener: InterceptingListener): SubchannelCall { const headers = metadata.toHttp2Headers(); - headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost(); + headers[HTTP2_HEADER_AUTHORITY] = host; headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc'; headers[HTTP2_HEADER_METHOD] = 'POST'; - headers[HTTP2_HEADER_PATH] = callStream.getMethod(); + headers[HTTP2_HEADER_PATH] = method; headers[HTTP2_HEADER_TE] = 'trailers'; let http2Stream: http2.ClientHttp2Stream; /* In theory, if an error is thrown by session.request because session has @@ -845,19 +823,6 @@ export class Subchannel { ); throw e; } - let headersString = ''; - for (const header of Object.keys(headers)) { - headersString += '\t\t' + header + ': ' + headers[header] + '\n'; - } - logging.trace( - LogVerbosity.DEBUG, - 'call_stream', - 'Starting stream [' + callStream.getCallNumber() + '] on subchannel ' + - '(' + this.channelzRef.id + ') ' + - this.subchannelAddressString + - ' with headers\n' + - headersString - ); this.flowControlTrace( 'local window size: ' + this.session!.state.localWindowSize + @@ -875,23 +840,7 @@ export class Subchannel { let statsTracker: SubchannelCallStatsTracker; if (this.channelzEnabled) { this.callTracker.addCallStarted(); - callStream.addStatusWatcher(status => { - if (status.code === Status.OK) { - this.callTracker.addCallSucceeded(); - } else { - this.callTracker.addCallFailed(); - } - }); this.streamTracker.addCallStarted(); - callStream.addStreamEndWatcher(success => { - if (streamSession === this.session) { - if (success) { - this.streamTracker.addCallSucceeded(); - } else { - this.streamTracker.addCallFailed(); - } - } - }); statsTracker = { addMessageSent: () => { this.messagesSent += 1; @@ -899,15 +848,33 @@ export class Subchannel { }, addMessageReceived: () => { this.messagesReceived += 1; + }, + onCallEnd: status => { + if (status.code === Status.OK) { + this.callTracker.addCallSucceeded(); + } else { + this.callTracker.addCallFailed(); + } + }, + onStreamEnd: success => { + if (streamSession === this.session) { + if (success) { + this.streamTracker.addCallSucceeded(); + } else { + this.streamTracker.addCallFailed(); + } + } } } } else { statsTracker = { addMessageSent: () => {}, - addMessageReceived: () => {} + addMessageReceived: () => {}, + onCallEnd: () => {}, + onStreamEnd: () => {} } } - callStream.attachHttp2Stream(http2Stream, this, extraFilters, statsTracker); + return new Http2SubchannelCall(http2Stream, statsTracker, listener, this, getNextCallNumber()); } /** diff --git a/packages/grpc-js/test/test-resolver.ts b/packages/grpc-js/test/test-resolver.ts index 354413ea6..1bcaabeb4 100644 --- a/packages/grpc-js/test/test-resolver.ts +++ b/packages/grpc-js/test/test-resolver.ts @@ -23,7 +23,7 @@ import * as resolver_dns from '../src/resolver-dns'; import * as resolver_uds from '../src/resolver-uds'; import * as resolver_ip from '../src/resolver-ip'; import { ServiceConfig } from '../src/service-config'; -import { StatusObject } from '../src/call-stream'; +import { StatusObject } from '../src/call-interface'; import { SubchannelAddress, isTcpSubchannelAddress, subchannelAddressToString } from "../src/subchannel-address"; import { parseUri, GrpcUri } from '../src/uri-parser'; From 8a312e63b719fe0166acb07c8a43f2a91253ea17 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 11 Oct 2022 16:50:49 -0700 Subject: [PATCH 2/4] grpc-js-xds: Update code to handle modified experimental APIs --- .../src/http-filter/fault-injection-filter.ts | 8 ++--- .../src/http-filter/router-filter.ts | 2 +- packages/grpc-js-xds/src/load-balancer-eds.ts | 28 +++-------------- packages/grpc-js-xds/src/load-balancer-lrs.ts | 31 +++---------------- .../src/load-balancer-xds-cluster-manager.ts | 4 +-- 5 files changed, 16 insertions(+), 57 deletions(-) diff --git a/packages/grpc-js-xds/src/http-filter/fault-injection-filter.ts b/packages/grpc-js-xds/src/http-filter/fault-injection-filter.ts index a49ec77d4..b02dfbc80 100644 --- a/packages/grpc-js-xds/src/http-filter/fault-injection-filter.ts +++ b/packages/grpc-js-xds/src/http-filter/fault-injection-filter.ts @@ -231,7 +231,7 @@ const NUMBER_REGEX = /\d+/; let totalActiveFaults = 0; class FaultInjectionFilter extends BaseFilter implements Filter { - constructor(private callStream: CallStream, private config: FaultInjectionConfig) { + constructor(private config: FaultInjectionConfig) { super(); } @@ -316,7 +316,7 @@ class FaultInjectionFilter extends BaseFilter implements Filter { } } if (abortStatus !== null && rollRandomPercentage(numerator, denominator)) { - this.callStream.cancelWithStatus(abortStatus, 'Fault injected'); + return Promise.reject({code: abortStatus, details: 'Fault injected', metadata: new Metadata()}); } } return metadata; @@ -333,8 +333,8 @@ class FaultInjectionFilterFactory implements FilterFactory } } - createFilter(callStream: experimental.CallStream): FaultInjectionFilter { - return new FaultInjectionFilter(callStream, this.config); + createFilter(): FaultInjectionFilter { + return new FaultInjectionFilter(this.config); } } diff --git a/packages/grpc-js-xds/src/http-filter/router-filter.ts b/packages/grpc-js-xds/src/http-filter/router-filter.ts index 81450c0ff..172a08740 100644 --- a/packages/grpc-js-xds/src/http-filter/router-filter.ts +++ b/packages/grpc-js-xds/src/http-filter/router-filter.ts @@ -26,7 +26,7 @@ class RouterFilter extends BaseFilter implements Filter {} class RouterFilterFactory implements FilterFactory { constructor(config: HttpFilterConfig, overrideConfig?: HttpFilterConfig) {} - createFilter(callStream: experimental.CallStream): RouterFilter { + createFilter(): RouterFilter { return new RouterFilter(); } } diff --git a/packages/grpc-js-xds/src/load-balancer-eds.ts b/packages/grpc-js-xds/src/load-balancer-eds.ts index e7aac0571..39ad2c2cb 100644 --- a/packages/grpc-js-xds/src/load-balancer-eds.ts +++ b/packages/grpc-js-xds/src/load-balancer-eds.ts @@ -146,24 +146,6 @@ export class EdsLoadBalancingConfig implements LoadBalancingConfig { } } -class CallEndTrackingFilter extends BaseFilter implements Filter { - constructor(private onCallEnd: () => void) { - super(); - } - receiveTrailers(status: StatusObject) { - this.onCallEnd(); - return status; - } -} - -class CallTrackingFilterFactory implements FilterFactory { - constructor(private onCallEnd: () => void) {} - - createFilter(callStream: CallStream) { - return new CallEndTrackingFilter(this.onCallEnd); - } -} - /** * This class load balances over a cluster by making an EDS request and then * transforming the result into a configuration for another load balancing @@ -217,9 +199,6 @@ export class EdsLoadBalancer implements LoadBalancer { * balancer. */ if (dropCategory === null) { const originalPick = originalPicker.pick(pickArgs); - const trackingFilterFactory: FilterFactory = new CallTrackingFilterFactory(() => { - this.concurrentRequests -= 1; - }); return { pickResultType: originalPick.pickResultType, status: originalPick.status, @@ -228,7 +207,10 @@ export class EdsLoadBalancer implements LoadBalancer { originalPick.onCallStarted?.(); this.concurrentRequests += 1; }, - extraFilterFactories: originalPick.extraFilterFactories.concat(trackingFilterFactory) + onCallEnded: status => { + originalPick.onCallEnded?.(status); + this.concurrentRequests -= 1; + } }; } else { let details: string; @@ -247,7 +229,7 @@ export class EdsLoadBalancer implements LoadBalancer { metadata: new Metadata(), }, subchannel: null, - extraFilterFactories: [], + onCallEnded: null, onCallStarted: null }; } diff --git a/packages/grpc-js-xds/src/load-balancer-lrs.ts b/packages/grpc-js-xds/src/load-balancer-lrs.ts index 145501fe9..0eaeeee34 100644 --- a/packages/grpc-js-xds/src/load-balancer-lrs.ts +++ b/packages/grpc-js-xds/src/load-balancer-lrs.ts @@ -108,29 +108,6 @@ export class LrsLoadBalancingConfig implements LoadBalancingConfig { } } -/** - * Filter class that reports when the call ends. - */ -class CallEndTrackingFilter extends BaseFilter implements Filter { - constructor(private localityStatsReporter: XdsClusterLocalityStats) { - super(); - } - - receiveTrailers(status: StatusObject) { - this.localityStatsReporter.addCallFinished(status.code !== Status.OK); - return status; - } -} - -class CallEndTrackingFilterFactory - implements FilterFactory { - constructor(private localityStatsReporter: XdsClusterLocalityStats) {} - - createFilter(callStream: Call): CallEndTrackingFilter { - return new CallEndTrackingFilter(this.localityStatsReporter); - } -} - /** * Picker that delegates picking to another picker, and reports when calls * created using those picks start and end. @@ -144,9 +121,6 @@ class LoadReportingPicker implements Picker { pick(pickArgs: PickArgs): PickResult { const wrappedPick = this.wrappedPicker.pick(pickArgs); if (wrappedPick.pickResultType === PickResultType.COMPLETE) { - const trackingFilterFactory = new CallEndTrackingFilterFactory( - this.localityStatsReporter - ); return { pickResultType: PickResultType.COMPLETE, subchannel: wrappedPick.subchannel, @@ -155,7 +129,10 @@ class LoadReportingPicker implements Picker { wrappedPick.onCallStarted?.(); this.localityStatsReporter.addCallStarted(); }, - extraFilterFactories: wrappedPick.extraFilterFactories.concat(trackingFilterFactory), + onCallEnded: status => { + wrappedPick.onCallEnded?.(status); + this.localityStatsReporter.addCallFinished(status !== Status.OK); + } }; } else { return wrappedPick; diff --git a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts index bfc55c809..bfdb4dccc 100644 --- a/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts +++ b/packages/grpc-js-xds/src/load-balancer-xds-cluster-manager.ts @@ -107,8 +107,8 @@ class XdsClusterManagerPicker implements Picker { metadata: new Metadata(), }, subchannel: null, - extraFilterFactories: [], - onCallStarted: null + onCallStarted: null, + onCallEnded: null }; } } From 59a2cbceeb288b0967dca7daafee254292bc0e1a Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 14 Oct 2022 10:27:38 -0700 Subject: [PATCH 3/4] grpc-js: Remove redundant calls to setCredentials --- packages/grpc-js/src/client-interceptors.ts | 8 -------- packages/grpc-js/src/client.ts | 12 ------------ 2 files changed, 20 deletions(-) diff --git a/packages/grpc-js/src/client-interceptors.ts b/packages/grpc-js/src/client-interceptors.ts index 52320b0b3..d9c88f448 100644 --- a/packages/grpc-js/src/client-interceptors.ts +++ b/packages/grpc-js/src/client-interceptors.ts @@ -198,8 +198,6 @@ export interface InterceptingCallInterface { sendMessage(message: any): void; startRead(): void; halfClose(): void; - - setCredentials(credentials: CallCredentials): void; } export class InterceptingCall implements InterceptingCallInterface { @@ -337,9 +335,6 @@ export class InterceptingCall implements InterceptingCallInterface { } }); } - setCredentials(credentials: CallCredentials): void { - this.nextCall.setCredentials(credentials); - } } function getCall(channel: Channel, path: string, options: CallOptions): Call { @@ -371,9 +366,6 @@ class BaseInterceptingCall implements InterceptingCallInterface { getPeer(): string { return this.call.getPeer(); } - setCredentials(credentials: CallCredentials): void { - this.call.setCredentials(credentials); - } // eslint-disable-next-line @typescript-eslint/no-explicit-any sendMessageWithContext(context: MessageContext, message: any): void { let serialized: Buffer; diff --git a/packages/grpc-js/src/client.ts b/packages/grpc-js/src/client.ts index d97c9fa3f..f96f8fdf0 100644 --- a/packages/grpc-js/src/client.ts +++ b/packages/grpc-js/src/client.ts @@ -321,9 +321,6 @@ export class Client { * before calling the CallInvocationTransformer, and we need to create the * call after that. */ emitter.call = call; - if (callProperties.callOptions.credentials) { - call.setCredentials(callProperties.callOptions.credentials); - } let responseMessage: ResponseType | null = null; let receivedStatus = false; const callerStackError = new Error(); @@ -449,9 +446,6 @@ export class Client { * before calling the CallInvocationTransformer, and we need to create the * call after that. */ emitter.call = call; - if (callProperties.callOptions.credentials) { - call.setCredentials(callProperties.callOptions.credentials); - } let responseMessage: ResponseType | null = null; let receivedStatus = false; const callerStackError = new Error(); @@ -582,9 +576,6 @@ export class Client { * before calling the CallInvocationTransformer, and we need to create the * call after that. */ stream.call = call; - if (callProperties.callOptions.credentials) { - call.setCredentials(callProperties.callOptions.credentials); - } let receivedStatus = false; const callerStackError = new Error(); call.start(callProperties.metadata, { @@ -681,9 +672,6 @@ export class Client { * before calling the CallInvocationTransformer, and we need to create the * call after that. */ stream.call = call; - if (callProperties.callOptions.credentials) { - call.setCredentials(callProperties.callOptions.credentials); - } let receivedStatus = false; const callerStackError = new Error(); call.start(callProperties.metadata, { From 63d9f6a6d68ceef57e72f02e37c91d140ae32196 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 14 Oct 2022 11:18:26 -0700 Subject: [PATCH 4/4] Ensure ordering between received messages and status --- packages/grpc-js/src/load-balancing-call.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/packages/grpc-js/src/load-balancing-call.ts b/packages/grpc-js/src/load-balancing-call.ts index 36af8b683..73c1fa9fd 100644 --- a/packages/grpc-js/src/load-balancing-call.ts +++ b/packages/grpc-js/src/load-balancing-call.ts @@ -44,6 +44,8 @@ export class LoadBalancingCall implements Call { private writeFilterPending = false; private pendingMessage: {context: MessageContext, message: Buffer} | null = null; private pendingHalfClose = false; + private readFilterPending = false; + private pendingChildStatus: StatusObject | null = null; private ended = false; private serviceUrl: string; private filterStack: FilterStack; @@ -153,14 +155,23 @@ export class LoadBalancingCall implements Call { this.listener!.onReceiveMetadata(this.filterStack.receiveMetadata(metadata)); }, onReceiveMessage: message => { + this.readFilterPending = true; this.filterStack.receiveMessage(message).then(filteredMesssage => { + this.readFilterPending = false; this.listener!.onReceiveMessage(filteredMesssage); + if (this.pendingChildStatus) { + this.outputStatus(this.pendingChildStatus, 'PROCESSED'); + } }, (status: StatusObject) => { this.cancelWithStatus(status.code, status.details); }); }, onReceiveStatus: status => { - this.outputStatus(status, 'PROCESSED'); + if (this.readFilterPending) { + this.pendingChildStatus = status; + } else { + this.outputStatus(status, 'PROCESSED'); + } } }); } catch (error) {