From 2791dbbb2399af32080f942b38411c182473ab2b Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 17 Sep 2020 11:25:56 -0700 Subject: [PATCH 1/3] grpc-js: Add a timer that holds the event loop open while there are pending calls --- packages/grpc-js/src/channel.ts | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 3195ebc2f..5edaf370d 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -137,6 +137,14 @@ export class ChannelImplementation implements Channel { private defaultAuthority: string; private filterStackFactory: FilterStackFactory; private target: GrpcUri; + /** + * This timer does not do anything on its own. Its purpose is to hold the + * event loop open while there are any pending calls for the channel that + * have not yet been assigned to specific subchannels. In other words, + * the invariant is that callRefTimer is reffed if and only if pickQueue + * is non-empty. + */ + private callRefTimer: NodeJS.Timer; constructor( target: string, private readonly credentials: ChannelCredentials, @@ -206,6 +214,7 @@ export class ChannelImplementation implements Channel { updateState: (connectivityState: ConnectivityState, picker: Picker) => { this.currentPicker = picker; const queueCopy = this.pickQueue.slice(); + this.callRefTimer.unref?.(); this.pickQueue = []; for (const { callStream, callMetadata } of queueCopy) { this.tryPick(callStream, callMetadata); @@ -230,6 +239,14 @@ export class ChannelImplementation implements Channel { new MaxMessageSizeFilterFactory(this.options), new CompressionFilterFactory(this), ]); + + this.callRefTimer = setInterval(() => {}, 1 << 31 - 1); + this.callRefTimer.unref?.(); + } + + private pushPick(callStream: Http2CallStream, callMetadata: Metadata) { + this.callRefTimer.ref?.(); + this.pickQueue.push({ callStream, callMetadata }); } /** @@ -276,7 +293,7 @@ export class ChannelImplementation implements Channel { ' has state ' + ConnectivityState[pickResult.subchannel!.getConnectivityState()] ); - this.pickQueue.push({ callStream, callMetadata }); + this.pushPick(callStream, callMetadata); break; } /* We need to clone the callMetadata here because the transparent @@ -367,11 +384,11 @@ export class ChannelImplementation implements Channel { } break; case PickResultType.QUEUE: - this.pickQueue.push({ callStream, callMetadata }); + this.pushPick(callStream, callMetadata); break; case PickResultType.TRANSIENT_FAILURE: if (callMetadata.getOptions().waitForReady) { - this.pickQueue.push({ callStream, callMetadata }); + this.pushPick(callStream, callMetadata); } else { callStream.cancelWithStatus( pickResult.status!.code, @@ -433,6 +450,7 @@ export class ChannelImplementation implements Channel { close() { this.resolvingLoadBalancer.destroy(); this.updateState(ConnectivityState.SHUTDOWN); + clearInterval(this.callRefTimer); this.subchannelPool.unrefUnusedSubchannels(); } From 6a32c00ed2cce4179bef5a6df100b879e1f82cca Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 17 Sep 2020 12:15:47 -0700 Subject: [PATCH 2/3] Bump grpc-js to 1.1.7 --- packages/grpc-js/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index a8428b249..ec3f4a972 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.1.6", + "version": "1.1.7", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", From a1600812d19cf96e501356cf6e97bd237577a204 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Thu, 17 Sep 2020 13:18:11 -0700 Subject: [PATCH 3/3] Move timer initialization to the beginning of channel construction --- packages/grpc-js/src/channel.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 5edaf370d..dcfae483d 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -48,6 +48,11 @@ export enum ConnectivityState { SHUTDOWN, } +/** + * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args + */ +const MAX_TIMEOUT_TIME = 2147483647; + let nextCallNumber = 0; function getNewCallNumber(): number { @@ -185,6 +190,10 @@ export class ChannelImplementation implements Channel { `Could not find a default scheme for target name "${target}"` ); } + + this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME); + this.callRefTimer.unref?.(); + if (this.options['grpc.default_authority']) { this.defaultAuthority = this.options['grpc.default_authority'] as string; } else { @@ -239,9 +248,6 @@ export class ChannelImplementation implements Channel { new MaxMessageSizeFilterFactory(this.options), new CompressionFilterFactory(this), ]); - - this.callRefTimer = setInterval(() => {}, 1 << 31 - 1); - this.callRefTimer.unref?.(); } private pushPick(callStream: Http2CallStream, callMetadata: Metadata) {