diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index a8428b249..ec3f4a972 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.1.6", + "version": "1.1.7", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 3195ebc2f..dcfae483d 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -48,6 +48,11 @@ export enum ConnectivityState { SHUTDOWN, } +/** + * See https://nodejs.org/api/timers.html#timers_setinterval_callback_delay_args + */ +const MAX_TIMEOUT_TIME = 2147483647; + let nextCallNumber = 0; function getNewCallNumber(): number { @@ -137,6 +142,14 @@ export class ChannelImplementation implements Channel { private defaultAuthority: string; private filterStackFactory: FilterStackFactory; private target: GrpcUri; + /** + * This timer does not do anything on its own. Its purpose is to hold the + * event loop open while there are any pending calls for the channel that + * have not yet been assigned to specific subchannels. In other words, + * the invariant is that callRefTimer is reffed if and only if pickQueue + * is non-empty. + */ + private callRefTimer: NodeJS.Timer; constructor( target: string, private readonly credentials: ChannelCredentials, @@ -177,6 +190,10 @@ export class ChannelImplementation implements Channel { `Could not find a default scheme for target name "${target}"` ); } + + this.callRefTimer = setInterval(() => {}, MAX_TIMEOUT_TIME); + this.callRefTimer.unref?.(); + if (this.options['grpc.default_authority']) { this.defaultAuthority = this.options['grpc.default_authority'] as string; } else { @@ -206,6 +223,7 @@ export class ChannelImplementation implements Channel { updateState: (connectivityState: ConnectivityState, picker: Picker) => { this.currentPicker = picker; const queueCopy = this.pickQueue.slice(); + this.callRefTimer.unref?.(); this.pickQueue = []; for (const { callStream, callMetadata } of queueCopy) { this.tryPick(callStream, callMetadata); @@ -232,6 +250,11 @@ export class ChannelImplementation implements Channel { ]); } + private pushPick(callStream: Http2CallStream, callMetadata: Metadata) { + this.callRefTimer.ref?.(); + this.pickQueue.push({ callStream, callMetadata }); + } + /** * Check the picker output for the given call and corresponding metadata, * and take any relevant actions. Should not be called while iterating @@ -276,7 +299,7 @@ export class ChannelImplementation implements Channel { ' has state ' + ConnectivityState[pickResult.subchannel!.getConnectivityState()] ); - this.pickQueue.push({ callStream, callMetadata }); + this.pushPick(callStream, callMetadata); break; } /* We need to clone the callMetadata here because the transparent @@ -367,11 +390,11 @@ export class ChannelImplementation implements Channel { } break; case PickResultType.QUEUE: - this.pickQueue.push({ callStream, callMetadata }); + this.pushPick(callStream, callMetadata); break; case PickResultType.TRANSIENT_FAILURE: if (callMetadata.getOptions().waitForReady) { - this.pickQueue.push({ callStream, callMetadata }); + this.pushPick(callStream, callMetadata); } else { callStream.cancelWithStatus( pickResult.status!.code, @@ -433,6 +456,7 @@ export class ChannelImplementation implements Channel { close() { this.resolvingLoadBalancer.destroy(); this.updateState(ConnectivityState.SHUTDOWN); + clearInterval(this.callRefTimer); this.subchannelPool.unrefUnusedSubchannels(); }