From a42d6b4f5c7351f700149af1e845ee4105e50e6f Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 9 Nov 2022 14:53:19 -0800 Subject: [PATCH] grpc-js: Implement server connection management --- packages/grpc-js/src/channel-options.ts | 4 ++ packages/grpc-js/src/server.ts | 74 ++++++++++++++++++++++++- 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/packages/grpc-js/src/channel-options.ts b/packages/grpc-js/src/channel-options.ts index b7fc92fa9..c05253e9b 100644 --- a/packages/grpc-js/src/channel-options.ts +++ b/packages/grpc-js/src/channel-options.ts @@ -44,6 +44,8 @@ export interface ChannelOptions { 'grpc.default_compression_algorithm'?: CompressionAlgorithms; 'grpc.enable_channelz'?: number; 'grpc.dns_min_time_between_resolutions_ms'?: number; + 'grpc.max_connection_age_ms'?: number; + 'grpc.max_connection_age_grace_ms'?: number; 'grpc-node.max_session_memory'?: number; // eslint-disable-next-line @typescript-eslint/no-explicit-any [key: string]: any; @@ -71,6 +73,8 @@ export const recognizedOptions = { 'grpc.enable_http_proxy': true, 'grpc.enable_channelz': true, 'grpc.dns_min_time_between_resolutions_ms': true, + 'grpc.max_connection_age_ms': true, + 'grpc.max_connection_age_grace_ms': true, 'grpc-node.max_session_memory': true, }; diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 9f7321408..d19186a75 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -63,6 +63,10 @@ import { ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerCh import { CipherNameAndProtocol, TLSSocket } from 'tls'; import { getErrorCode, getErrorMessage } from './error'; +const UNLIMITED_CONNECTION_AGE_MS = ~(1<<31); +const KEEPALIVE_MAX_TIME_MS = ~(1<<31); +const KEEPALIVE_TIMEOUT_MS = 20000; + const { HTTP2_HEADER_PATH } = http2.constants @@ -161,6 +165,12 @@ export class Server { private listenerChildrenTracker = new ChannelzChildrenTracker(); private sessionChildrenTracker = new ChannelzChildrenTracker(); + private readonly maxConnectionAgeMs: number; + private readonly maxConnectionAgeGraceMs: number; + + private readonly keepaliveTimeMs: number; + private readonly keepaliveTimeoutMs: number; + constructor(options?: ChannelOptions) { this.options = options ?? {}; if (this.options['grpc.enable_channelz'] === 0) { @@ -170,7 +180,10 @@ export class Server { if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Server created'); } - + this.maxConnectionAgeMs = this.options['grpc.max_connection_age_ms'] ?? UNLIMITED_CONNECTION_AGE_MS; + this.maxConnectionAgeGraceMs = this.options['grpc.max_connection_age_grace_ms'] ?? UNLIMITED_CONNECTION_AGE_MS; + this.keepaliveTimeMs = this.options['grpc.keepalive_time_ms'] ?? KEEPALIVE_MAX_TIME_MS; + this.keepaliveTimeoutMs = this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS; this.trace('Server constructed'); } @@ -970,12 +983,69 @@ export class Server { this.channelzTrace.addTrace('CT_INFO', 'Connection established by client ' + clientAddress); this.sessionChildrenTracker.refChild(channelzRef); } + let connectionAgeTimer: NodeJS.Timer | null = null; + let connectionAgeGraceTimer: NodeJS.Timer | null = null; + let sessionClosedByServer = false; + if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) { + // Apply a random jitter within a +/-10% range + const jitterMagnitude = this.maxConnectionAgeMs / 10; + const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude; + connectionAgeTimer = setTimeout(() => { + sessionClosedByServer = true; + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by max connection age from ' + clientAddress); + } + try { + session.goaway(http2.constants.NGHTTP2_NO_ERROR, ~(1<<31), Buffer.from('max_age')); + } catch (e) { + // The goaway can't be sent because the session is already closed + session.destroy(); + return; + } + session.close(); + /* Allow a grace period after sending the GOAWAY before forcibly + * closing the connection. */ + if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) { + connectionAgeGraceTimer = setTimeout(() => { + session.destroy(); + }, this.maxConnectionAgeGraceMs).unref?.(); + } + }, this.maxConnectionAgeMs + jitter).unref?.(); + } + const keeapliveTimeTimer: NodeJS.Timer | null = setInterval(() => { + const timeoutTImer = setTimeout(() => { + sessionClosedByServer = true; + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by keepalive timeout from ' + clientAddress); + } + session.close(); + }, this.keepaliveTimeoutMs).unref?.(); + try { + session.ping((err: Error | null, duration: number, payload: Buffer) => { + clearTimeout(timeoutTImer); + }); + } catch (e) { + // The ping can't be sent because the session is already closed + session.destroy(); + } + }, this.keepaliveTimeMs).unref?.(); session.on('close', () => { if (this.channelzEnabled) { - this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress); + if (!sessionClosedByServer) { + this.channelzTrace.addTrace('CT_INFO', 'Connection dropped by client ' + clientAddress); + } this.sessionChildrenTracker.unrefChild(channelzRef); unregisterChannelzRef(channelzRef); } + if (connectionAgeTimer) { + clearTimeout(connectionAgeTimer); + } + if (connectionAgeGraceTimer) { + clearTimeout(connectionAgeGraceTimer); + } + if (keeapliveTimeTimer) { + clearTimeout(keeapliveTimeTimer); + } this.sessions.delete(session); }); });