From 2f124ad68bfe5d9df1130d7c25ea340a50b87a58 Mon Sep 17 00:00:00 2001 From: AVVS Date: Wed, 19 Oct 2022 14:48:11 -0700 Subject: [PATCH 1/2] fix: perf issues in hot paths 1. no unused timers, wrap tracing calls to avoid stringifying 2. track graceful end of the call and avoid emitting 'cancelled' in such cases 3. remove validate calls in metadata on operations where it's not needed 4. refactor server session stream handlers into separate channelz enabled/disabled handlers 5. refactor message request logic - reduce amount of microtasks generated 6. improve sendStatus a little when there is no metadata involved --- packages/grpc-js/src/call-stream.ts | 4 + .../generated/grpc/channelz/v1/Channelz.ts | 104 ++--- packages/grpc-js/src/metadata.ts | 64 ++- packages/grpc-js/src/server-call.ts | 305 +++++++------ packages/grpc-js/src/server.ts | 413 +++++++++++------- 5 files changed, 502 insertions(+), 388 deletions(-) diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index d7151eb6f..488e35fc8 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -97,6 +97,10 @@ export interface StatusObject { metadata: Metadata; } +export type PartialStatusObject = Pick & { + metadata: Metadata | null; +} + export const enum WriteFlags { BufferHint = 1, NoCompress = 2, diff --git a/packages/grpc-js/src/generated/grpc/channelz/v1/Channelz.ts b/packages/grpc-js/src/generated/grpc/channelz/v1/Channelz.ts index ace712454..4c8c18aa7 100644 --- a/packages/grpc-js/src/generated/grpc/channelz/v1/Channelz.ts +++ b/packages/grpc-js/src/generated/grpc/channelz/v1/Channelz.ts @@ -25,102 +25,102 @@ export interface ChannelzClient extends grpc.Client { /** * Returns a single Channel, or else a NOT_FOUND code. */ - GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall; - GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall; - GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall; - GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall; + GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall; + GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall; + GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall; + GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall; /** * Returns a single Server, or else a NOT_FOUND code. */ - GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; - GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; - GetServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; - GetServer(argument: _grpc_channelz_v1_GetServerRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; + GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; + GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; + GetServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; + GetServer(argument: _grpc_channelz_v1_GetServerRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; /** * Returns a single Server, or else a NOT_FOUND code. */ - getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; - getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; - getServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; - getServer(argument: _grpc_channelz_v1_GetServerRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; + getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; + getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; + getServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; + getServer(argument: _grpc_channelz_v1_GetServerRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; /** * Gets all server sockets that exist in the process. */ - GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; - GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; - GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; - GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; + GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; + GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; + GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; + GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; /** * Gets all server sockets that exist in the process. */ - getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; - getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; - getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; - getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; + getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; + getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; + getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; + getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; /** * Gets all servers that exist in the process. */ - GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; - GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; - GetServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; - GetServers(argument: _grpc_channelz_v1_GetServersRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; + GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; + GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; + GetServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; + GetServers(argument: _grpc_channelz_v1_GetServersRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; /** * Gets all servers that exist in the process. */ - getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; - getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; - getServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; - getServers(argument: _grpc_channelz_v1_GetServersRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; + getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; + getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; + getServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; + getServers(argument: _grpc_channelz_v1_GetServersRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; /** * Returns a single Socket or else a NOT_FOUND code. */ - GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; - GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; - GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; - GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; + GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; + GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; + GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; + GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; /** * Returns a single Socket or else a NOT_FOUND code. */ - getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; - getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; - getSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; - getSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; + getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; + getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; + getSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; + getSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; /** * Returns a single Subchannel, or else a NOT_FOUND code. */ - GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; - GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; - GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; - GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; + GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; + GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; + GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; + GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; /** * Returns a single Subchannel, or else a NOT_FOUND code. */ - getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; - getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; - getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; - getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; + getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; + getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; + getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; + getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; /** * Gets all root channels (i.e. channels the application has directly * created). This does not include subchannels nor non-top level channels. */ - GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; - GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; - GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; - GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; + GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; + GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; + GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; + GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; /** * Gets all root channels (i.e. channels the application has directly * created). This does not include subchannels nor non-top level channels. */ - getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; - getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; - getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; - getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; + getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; + getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; + getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; + getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; } diff --git a/packages/grpc-js/src/metadata.ts b/packages/grpc-js/src/metadata.ts index 04db642ef..376cb491a 100644 --- a/packages/grpc-js/src/metadata.ts +++ b/packages/grpc-js/src/metadata.ts @@ -48,13 +48,14 @@ function validate(key: string, value?: MetadataValue): void { if (!isLegalKey(key)) { throw new Error('Metadata key "' + key + '" contains illegal characters'); } + if (value !== null && value !== undefined) { if (isBinaryKey(key)) { - if (!(value instanceof Buffer)) { + if (!Buffer.isBuffer(value)) { throw new Error("keys that end with '-bin' must have Buffer values"); } } else { - if (value instanceof Buffer) { + if (Buffer.isBuffer(value)) { throw new Error( "keys that don't end with '-bin' must have String values" ); @@ -88,12 +89,8 @@ export class Metadata { protected internalRepr: MetadataObject = new Map(); private options: MetadataOptions; - constructor(options?: MetadataOptions) { - if (options === undefined) { - this.options = {}; - } else { - this.options = options; - } + constructor(options: MetadataOptions = {}) { + this.options = options; } /** @@ -120,9 +117,7 @@ export class Metadata { key = normalizeKey(key); validate(key, value); - const existingValue: MetadataValue[] | undefined = this.internalRepr.get( - key - ); + const existingValue: MetadataValue[] | undefined = this.internalRepr.get(key); if (existingValue === undefined) { this.internalRepr.set(key, [value]); @@ -137,7 +132,7 @@ export class Metadata { */ remove(key: string): void { key = normalizeKey(key); - validate(key); + // validate(key); this.internalRepr.delete(key); } @@ -148,7 +143,7 @@ export class Metadata { */ get(key: string): MetadataValue[] { key = normalizeKey(key); - validate(key); + // validate(key); return this.internalRepr.get(key) || []; } @@ -160,12 +155,12 @@ export class Metadata { getMap(): { [key: string]: MetadataValue } { const result: { [key: string]: MetadataValue } = {}; - this.internalRepr.forEach((values, key) => { + for (const [key, values] of this.internalRepr) { if (values.length > 0) { const v = values[0]; - result[key] = v instanceof Buffer ? v.slice() : v; + result[key] = Buffer.isBuffer(v) ? Buffer.from(v) : v; } - }); + } return result; } @@ -177,9 +172,9 @@ export class Metadata { const newMetadata = new Metadata(this.options); const newInternalRepr = newMetadata.internalRepr; - this.internalRepr.forEach((value, key) => { + for (const [key, value] of this.internalRepr) { const clonedValue: MetadataValue[] = value.map((v) => { - if (v instanceof Buffer) { + if (Buffer.isBuffer(v)) { return Buffer.from(v); } else { return v; @@ -187,7 +182,7 @@ export class Metadata { }); newInternalRepr.set(key, clonedValue); - }); + } return newMetadata; } @@ -200,13 +195,13 @@ export class Metadata { * @param other A Metadata object. */ merge(other: Metadata): void { - other.internalRepr.forEach((values, key) => { + for (const [key, values] of other.internalRepr) { const mergedValue: MetadataValue[] = ( this.internalRepr.get(key) || [] ).concat(values); this.internalRepr.set(key, mergedValue); - }); + } } setOptions(options: MetadataOptions) { @@ -223,17 +218,13 @@ export class Metadata { toHttp2Headers(): http2.OutgoingHttpHeaders { // NOTE: Node <8.9 formats http2 headers incorrectly. const result: http2.OutgoingHttpHeaders = {}; - this.internalRepr.forEach((values, key) => { + + for (const [key, values] of this.internalRepr) { // We assume that the user's interaction with this object is limited to // through its public API (i.e. keys and values are already validated). - result[key] = values.map((value) => { - if (value instanceof Buffer) { - return value.toString('base64'); - } else { - return value; - } - }); - }); + result[key] = values.map(bufToString); + } + return result; } @@ -248,7 +239,7 @@ export class Metadata { */ toJSON() { const result: { [key: string]: MetadataValue[] } = {}; - for (const [key, values] of this.internalRepr.entries()) { + for (const [key, values] of this.internalRepr) { result[key] = values; } return result; @@ -261,10 +252,10 @@ export class Metadata { */ static fromHttp2Headers(headers: http2.IncomingHttpHeaders): Metadata { const result = new Metadata(); - Object.keys(headers).forEach((key) => { + for (const key of Object.keys(headers)) { // Reserved headers (beginning with `:`) are not valid keys. if (key.charAt(0) === ':') { - return; + continue; } const values = headers[key]; @@ -297,7 +288,12 @@ export class Metadata { const message = `Failed to add metadata entry ${key}: ${values}. ${error.message}. For more information see https://github.com/grpc/grpc-node/issues/1173`; log(LogVerbosity.ERROR, message); } - }); + } + return result; } } + +const bufToString = (val: string | Buffer): string => { + return Buffer.isBuffer(val) ? val.toString('base64') : val +}; diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 1f7f3eb04..84ff7c8ee 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -19,8 +19,9 @@ import { EventEmitter } from 'events'; import * as http2 from 'http2'; import { Duplex, Readable, Writable } from 'stream'; import * as zlib from 'zlib'; +import { promisify } from 'util'; -import { Deadline, StatusObject } from './call-stream'; +import { Deadline, StatusObject, PartialStatusObject } from './call-stream'; import { Status, DEFAULT_MAX_SEND_MESSAGE_LENGTH, @@ -35,6 +36,8 @@ import { ChannelOptions } from './channel-options'; import * as logging from './logging'; const TRACER_NAME = 'server_call'; +const unzip = promisify(zlib.unzip); +const inflate = promisify(zlib.inflate); function trace(text: string): void { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); @@ -86,25 +89,22 @@ export type ServerSurfaceCall = { export type ServerUnaryCall = ServerSurfaceCall & { request: RequestType; }; -export type ServerReadableStream< - RequestType, - ResponseType -> = ServerSurfaceCall & ObjectReadable; -export type ServerWritableStream< - RequestType, - ResponseType -> = ServerSurfaceCall & - ObjectWritable & { - request: RequestType; - end: (metadata?: Metadata) => void; - }; +export type ServerReadableStream = + ServerSurfaceCall & ObjectReadable; +export type ServerWritableStream = + ServerSurfaceCall & + ObjectWritable & { + request: RequestType; + end: (metadata?: Metadata) => void; + }; export type ServerDuplexStream = ServerSurfaceCall & ObjectReadable & ObjectWritable & { end: (metadata?: Metadata) => void }; export class ServerUnaryCallImpl extends EventEmitter - implements ServerUnaryCall { + implements ServerUnaryCall +{ cancelled: boolean; constructor( @@ -136,7 +136,8 @@ export class ServerUnaryCallImpl export class ServerReadableStreamImpl extends Readable - implements ServerReadableStream { + implements ServerReadableStream +{ cancelled: boolean; constructor( @@ -178,7 +179,8 @@ export class ServerReadableStreamImpl export class ServerWritableStreamImpl extends Writable - implements ServerWritableStream { + implements ServerWritableStream +{ cancelled: boolean; private trailingMetadata: Metadata; @@ -257,7 +259,8 @@ export class ServerWritableStreamImpl export class ServerDuplexStreamImpl extends Duplex - implements ServerDuplexStream { + implements ServerDuplexStream +{ cancelled: boolean; private trailingMetadata: Metadata; @@ -395,7 +398,8 @@ export class Http2ServerCallStream< ResponseType > extends EventEmitter { cancelled = false; - deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0); + deadlineTimer: NodeJS.Timer | null = null; + private statusSent = false; private deadline: Deadline = Infinity; private wantTrailers = false; private metadataSent = false; @@ -428,10 +432,20 @@ export class Http2ServerCallStream< ' stream closed with rstCode ' + this.stream.rstCode ); - this.cancelled = true; - this.emit('cancelled', 'cancelled'); - this.emit('streamEnd', false); - this.sendStatus({code: Status.CANCELLED, details: 'Cancelled by client', metadata: new Metadata()}); + + if (!this.statusSent) { + this.cancelled = true; + this.emit('cancelled', 'cancelled'); + this.emit('streamEnd', false); + this.sendStatus({ + code: Status.CANCELLED, + details: 'Cancelled by client', + metadata: null, + }); + } + + // to compensate for a fact that cancelled is not always called + this.emit('close'); }); this.stream.on('drain', () => { @@ -444,9 +458,6 @@ export class Http2ServerCallStream< if ('grpc.max_receive_message_length' in options) { this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!; } - - // Clear noop timer - clearTimeout(this.deadlineTimer); } private checkCancelled(): boolean { @@ -458,52 +469,22 @@ export class Http2ServerCallStream< return this.cancelled; } - private getDecompressedMessage(message: Buffer, encoding: string) { - switch (encoding) { - case 'deflate': { - return new Promise((resolve, reject) => { - zlib.inflate(message.slice(5), (err, output) => { - if (err) { - this.sendError({ - code: Status.INTERNAL, - details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`, - }); - resolve(); - } else { - resolve(output); - } - }); - }); - } - - case 'gzip': { - return new Promise((resolve, reject) => { - zlib.unzip(message.slice(5), (err, output) => { - if (err) { - this.sendError({ - code: Status.INTERNAL, - details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`, - }); - resolve(); - } else { - resolve(output); - } - }); - }); - } - - case 'identity': { - return Promise.resolve(message.slice(5)); - } - - default: { - this.sendError({ - code: Status.UNIMPLEMENTED, - details: `Received message compressed with unsupported encoding "${encoding}"`, - }); - return Promise.resolve(); - } + private getDecompressedMessage( + message: Buffer, + encoding: string + ): Buffer | Promise { + if (encoding === 'deflate') { + return inflate(message.subarray(5)); + } else if (encoding === 'gzip') { + return unzip(message.subarray(5)); + } else if (encoding === 'identity') { + return message.subarray(5); } + + return Promise.reject({ + code: Status.UNIMPLEMENTED, + details: `Received message compressed with unsupported encoding "${encoding}"`, + }); } sendMetadata(customMetadata?: Metadata) { @@ -518,13 +499,22 @@ export class Http2ServerCallStream< this.metadataSent = true; const custom = customMetadata ? customMetadata.toHttp2Headers() : null; // TODO(cjihrig): Include compression headers. - const headers = Object.assign({}, defaultResponseHeaders, custom); + const headers = { ...defaultResponseHeaders, ...custom }; this.stream.respond(headers, defaultResponseOptions); } receiveMetadata(headers: http2.IncomingHttpHeaders) { const metadata = Metadata.fromHttp2Headers(headers); + if (logging.isTracerEnabled(TRACER_NAME)) { + trace( + 'Request to ' + + this.handler.path + + ' received headers ' + + JSON.stringify(metadata.toJSON()) + ); + } + // TODO(cjihrig): Receive compression metadata. const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER); @@ -556,52 +546,95 @@ export class Http2ServerCallStream< return metadata; } - receiveUnaryMessage(encoding: string): Promise { - return new Promise((resolve, reject) => { - const stream = this.stream; - const chunks: Buffer[] = []; - let totalLength = 0; + receiveUnaryMessage( + encoding: string, + next: ( + err: Partial | null, + request?: RequestType + ) => void + ): void { + const { stream } = this; + + let receivedLength = 0; + const call = this; + const body: Buffer[] = []; + const limit = this.maxReceiveMessageSize; + + stream.on('data', onData); + stream.on('end', onEnd); + stream.on('error', onEnd); + + function onData(chunk: Buffer) { + receivedLength += chunk.byteLength; + + if (limit !== -1 && receivedLength > limit) { + stream.removeListener('data', onData); + stream.removeListener('end', onEnd); + stream.removeListener('error', onEnd); + next({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message larger than max (${receivedLength} vs. ${limit})`, + }); + return; + } - stream.on('data', (data: Buffer) => { - chunks.push(data); - totalLength += data.byteLength; - }); + body.push(chunk); + } - stream.once('end', async () => { - try { - const requestBytes = Buffer.concat(chunks, totalLength); - if ( - this.maxReceiveMessageSize !== -1 && - requestBytes.length > this.maxReceiveMessageSize - ) { - this.sendError({ - code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${requestBytes.length} vs. ${this.maxReceiveMessageSize})`, - }); - resolve(); - } - - this.emit('receiveMessage'); - - const compressed = requestBytes.readUInt8(0) === 1; - const compressedMessageEncoding = compressed ? encoding : 'identity'; - const decompressedMessage = await this.getDecompressedMessage(requestBytes, compressedMessageEncoding); - - // Encountered an error with decompression; it'll already have been propogated back - // Just return early - if (!decompressedMessage) { - resolve(); - } - else { - resolve(this.deserializeMessage(decompressedMessage)); - } - } catch (err) { - err.code = Status.INTERNAL; - this.sendError(err); - resolve(); - } - }); - }); + function onEnd(err?: Error) { + stream.removeListener('data', onData); + stream.removeListener('end', onEnd); + stream.removeListener('error', onEnd); + + if (err !== undefined) { + next({ code: Status.INTERNAL, details: err.message }); + return; + } + + if (receivedLength === 0) { + next({ code: Status.INTERNAL, details: 'received empty unary message' }) + return; + } + + call.emit('receiveMessage'); + + const requestBytes = Buffer.concat(body, receivedLength); + const compressed = requestBytes.readUInt8(0) === 1; + const compressedMessageEncoding = compressed ? encoding : 'identity'; + const decompressedMessage = call.getDecompressedMessage( + requestBytes, + compressedMessageEncoding + ); + + if (Buffer.isBuffer(decompressedMessage)) { + call.safeDeserializeMessage(decompressedMessage, next); + return; + } + + decompressedMessage.then( + (decompressed) => call.safeDeserializeMessage(decompressed, next), + (err: any) => next( + err.code + ? err + : { + code: Status.INTERNAL, + details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`, + } + ) + ) + } + } + + private safeDeserializeMessage( + buffer: Buffer, + next: (err: Partial | null, request?: RequestType) => void + ) { + try { + next(null, this.deserializeMessage(buffer)); + } catch (err) { + err.code = Status.INTERNAL; + next(err); + } } serializeMessage(value: ResponseType) { @@ -623,18 +656,19 @@ export class Http2ServerCallStream< async sendUnaryMessage( err: ServerErrorResponse | ServerStatusResponse | null, value?: ResponseType | null, - metadata?: Metadata, + metadata?: Metadata | null, flags?: number ) { if (this.checkCancelled()) { return; } - if (!metadata) { - metadata = new Metadata(); + + if (metadata === undefined) { + metadata = null; } if (err) { - if (!Object.prototype.hasOwnProperty.call(err, 'metadata')) { + if (!Object.prototype.hasOwnProperty.call(err, 'metadata') && metadata) { err.metadata = metadata; } this.sendError(err); @@ -652,7 +686,7 @@ export class Http2ServerCallStream< } } - sendStatus(statusObj: StatusObject) { + sendStatus(statusObj: PartialStatusObject) { this.emit('callEnd', statusObj.code); this.emit('streamEnd', statusObj.code === Status.OK); if (this.checkCancelled()) { @@ -668,20 +702,19 @@ export class Http2ServerCallStream< statusObj.details ); - clearTimeout(this.deadlineTimer); + if (this.deadlineTimer) clearTimeout(this.deadlineTimer); if (!this.wantTrailers) { this.wantTrailers = true; this.stream.once('wantTrailers', () => { - const trailersToSend = Object.assign( - { - [GRPC_STATUS_HEADER]: statusObj.code, - [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details as string), - }, - statusObj.metadata.toHttp2Headers() - ); + const trailersToSend = { + [GRPC_STATUS_HEADER]: statusObj.code, + [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details), + ...statusObj.metadata?.toHttp2Headers(), + }; this.stream.sendTrailers(trailersToSend); + this.statusSent = true; }); this.sendMetadata(); this.stream.end(); @@ -689,13 +722,13 @@ export class Http2ServerCallStream< } sendError(error: ServerErrorResponse | ServerStatusResponse) { - const status: StatusObject = { + const status: PartialStatusObject = { code: Status.UNKNOWN, details: 'message' in error ? error.message : 'Unknown Error', metadata: 'metadata' in error && error.metadata !== undefined ? error.metadata - : new Metadata(), + : null, }; if ( @@ -744,6 +777,9 @@ export class Http2ServerCallStream< call.emit('cancelled', reason); }); + // to compensate for the fact that cancelled is no longer always called + this.once('close', () => call.emit('close')) + this.once('callEnd', (status) => call.emit('callEnd', status)); } @@ -766,7 +802,7 @@ export class Http2ServerCallStream< pushedEnd = true; this.pushOrBufferMessage(readable, null); } - } + }; this.stream.on('data', async (data: Buffer) => { const messages = decoder.write(data); @@ -788,12 +824,15 @@ export class Http2ServerCallStream< const compressed = message.readUInt8(0) === 1; const compressedMessageEncoding = compressed ? encoding : 'identity'; - const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding); + const decompressedMessage = await this.getDecompressedMessage( + message, + compressedMessageEncoding + ); // Encountered an error with decompression; it'll already have been propogated back // Just return early if (!decompressedMessage) return; - + this.pushOrBufferMessage(readable, decompressedMessage); } pendingMessageProcessing = false; diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 499fde4a3..2e89f459b 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -62,6 +62,10 @@ import { parseUri } from './uri-parser'; import { ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzServer, registerChannelzSocket, ServerInfo, ServerRef, SocketInfo, SocketRef, TlsInfo, unregisterChannelzRef } from './channelz'; import { CipherNameAndProtocol, TLSSocket } from 'tls'; +const { + HTTP2_HEADER_PATH +} = http2.constants + const TRACER_NAME = 'server'; interface BindResult { @@ -77,7 +81,6 @@ function getUnimplementedStatusResponse( return { code: Status.UNIMPLEMENTED, details: `The server does not implement the method ${methodName}`, - metadata: new Metadata(), }; } @@ -147,6 +150,7 @@ export class Server { private sessions = new Map(); private started = false; private options: ChannelOptions; + private serverAddressString: string = 'null' // Channelz Info private readonly channelzEnabled: boolean = true; @@ -165,6 +169,7 @@ export class Server { if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Server created'); } + this.trace('Server constructed'); } @@ -730,150 +735,210 @@ export class Server { return this.channelzRef; } - private _setupHandlers( - http2Server: http2.Http2Server | http2.Http2SecureServer - ): void { - if (http2Server === null) { - return; + private _verifyContentType(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders): boolean { + const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE]; + + if ( + typeof contentType !== 'string' || + !contentType.startsWith('application/grpc') + ) { + stream.respond( + { + [http2.constants.HTTP2_HEADER_STATUS]: + http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, + }, + { endStream: true } + ); + return false } - http2Server.on( - 'stream', - (stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => { - const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session); - if (this.channelzEnabled) { - this.callTracker.addCallStarted(); - channelzSessionInfo?.streamTracker.addCallStarted(); - } - const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE]; - - if ( - typeof contentType !== 'string' || - !contentType.startsWith('application/grpc') - ) { - stream.respond( - { - [http2.constants.HTTP2_HEADER_STATUS]: - http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, - }, - { endStream: true } - ); - this.callTracker.addCallFailed(); - if (this.channelzEnabled) { - channelzSessionInfo?.streamTracker.addCallFailed(); - } - return; - } + return true + } - let call: Http2ServerCallStream | null = null; + private _retrieveHandler(headers: http2.IncomingHttpHeaders): Handler { + const path = headers[HTTP2_HEADER_PATH] as string - try { - const path = headers[http2.constants.HTTP2_HEADER_PATH] as string; - const serverAddress = http2Server.address(); - let serverAddressString = 'null'; - if (serverAddress) { - if (typeof serverAddress === 'string') { - serverAddressString = serverAddress; - } else { - serverAddressString = - serverAddress.address + ':' + serverAddress.port; - } - } - this.trace( - 'Received call to method ' + - path + - ' at address ' + - serverAddressString - ); - const handler = this.handlers.get(path); + this.trace( + 'Received call to method ' + + path + + ' at address ' + + this.serverAddressString + ); - if (handler === undefined) { - this.trace( - 'No handler registered for method ' + - path + - '. Sending UNIMPLEMENTED status.' - ); - throw getUnimplementedStatusResponse(path); - } + const handler = this.handlers.get(path); - call = new Http2ServerCallStream(stream, handler, this.options); - call.once('callEnd', (code: Status) => { - if (code === Status.OK) { - this.callTracker.addCallSucceeded(); - } else { - this.callTracker.addCallFailed(); - } - }); - if (this.channelzEnabled && channelzSessionInfo) { - call.once('streamEnd', (success: boolean) => { - if (success) { - channelzSessionInfo.streamTracker.addCallSucceeded(); - } else { - channelzSessionInfo.streamTracker.addCallFailed(); - } - }); - call.on('sendMessage', () => { - channelzSessionInfo.messagesSent += 1; - channelzSessionInfo.lastMessageSentTimestamp = new Date(); - }); - call.on('receiveMessage', () => { - channelzSessionInfo.messagesReceived += 1; - channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); - }); - } - const metadata = call.receiveMetadata(headers); - const encoding = (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity'; - metadata.remove('grpc-encoding'); - - switch (handler.type) { - case 'unary': - handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding); - break; - case 'clientStream': - handleClientStreaming( - call, - handler as UntypedClientStreamingHandler, - metadata, - encoding - ); - break; - case 'serverStream': - handleServerStreaming( - call, - handler as UntypedServerStreamingHandler, - metadata, - encoding - ); - break; - case 'bidi': - handleBidiStreaming( - call, - handler as UntypedBidiStreamingHandler, - metadata, - encoding - ); - break; - default: - throw new Error(`Unknown handler type: ${handler.type}`); - } - } catch (err) { - if (!call) { - call = new Http2ServerCallStream(stream, null!, this.options); - if (this.channelzEnabled) { - this.callTracker.addCallFailed(); - channelzSessionInfo?.streamTracker.addCallFailed() - } - } + if (handler === undefined) { + this.trace( + 'No handler registered for method ' + + path + + '. Sending UNIMPLEMENTED status.' + ); + throw getUnimplementedStatusResponse(path); + } - if (err.code === undefined) { - err.code = Status.INTERNAL; - } + return handler + } + + private _respondWithError>( + err: T, + stream: http2.ServerHttp2Stream, + channelzSessionInfo: ChannelzSessionInfo | null = null + ) { + const call = new Http2ServerCallStream(stream, null!, this.options); + + if (err.code === undefined) { + err.code = Status.INTERNAL; + } - call.sendError(err); + if (this.channelzEnabled) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed() + } + + call.sendError(err); + } + + private _channelzHandler(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) { + const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session); + + this.callTracker.addCallStarted(); + channelzSessionInfo?.streamTracker.addCallStarted(); + + if (!this._verifyContentType(stream, headers)) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); + return + } + + let handler: Handler + try { + handler = this._retrieveHandler(headers) + } catch (err) { + this._respondWithError(err, stream, channelzSessionInfo) + return + } + + const call = new Http2ServerCallStream(stream, handler, this.options); + + call.once('callEnd', (code: Status) => { + if (code === Status.OK) { + this.callTracker.addCallSucceeded(); + } else { + this.callTracker.addCallFailed(); + } + }); + + if (channelzSessionInfo) { + call.once('streamEnd', (success: boolean) => { + if (success) { + channelzSessionInfo.streamTracker.addCallSucceeded(); + } else { + channelzSessionInfo.streamTracker.addCallFailed(); } + }); + call.on('sendMessage', () => { + channelzSessionInfo.messagesSent += 1; + channelzSessionInfo.lastMessageSentTimestamp = new Date(); + }); + call.on('receiveMessage', () => { + channelzSessionInfo.messagesReceived += 1; + channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); + }); + } + + if (!this._runHandlerForCall(call, handler, headers)) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed() + + call.sendError({ + code: Status.INTERNAL, + details: `Unknown handler type: ${handler.type}` + }); + } + } + + private _streamHandler(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) { + if (this._verifyContentType(stream, headers) !== true) { + return + } + + let handler: Handler + try { + handler = this._retrieveHandler(headers) + } catch (err) { + this._respondWithError(err, stream, null) + return + } + + const call = new Http2ServerCallStream(stream, handler, this.options) + if (!this._runHandlerForCall(call, handler, headers)) { + call.sendError({ + code: Status.INTERNAL, + details: `Unknown handler type: ${handler.type}` + }); + } + } + + private _runHandlerForCall(call: Http2ServerCallStream, handler: Handler, headers: http2.IncomingHttpHeaders): boolean { + const metadata = call.receiveMetadata(headers); + const encoding = (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity'; + metadata.remove('grpc-encoding'); + + const { type } = handler + if (type === 'unary') { + handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding); + } else if (type === 'clientStream') { + handleClientStreaming( + call, + handler as UntypedClientStreamingHandler, + metadata, + encoding + ); + } else if (type === 'serverStream') { + handleServerStreaming( + call, + handler as UntypedServerStreamingHandler, + metadata, + encoding + ); + } else if (type === 'bidi') { + handleBidiStreaming( + call, + handler as UntypedBidiStreamingHandler, + metadata, + encoding + ); + } else { + return false + } + + return true + } + + private _setupHandlers( + http2Server: http2.Http2Server | http2.Http2SecureServer + ): void { + if (http2Server === null) { + return; + } + + const serverAddress = http2Server.address(); + let serverAddressString = 'null' + if (serverAddress) { + if (typeof serverAddress === 'string') { + serverAddressString = serverAddress + } else { + serverAddressString = + serverAddress.address + ':' + serverAddress.port } - ); + } + this.serverAddressString = serverAddressString + + const handler = this.channelzEnabled + ? this._channelzHandler + : this._streamHandler + http2Server.on('stream', handler.bind(this)) http2Server.on('session', (session) => { if (!this.started) { session.destroy(); @@ -910,35 +975,40 @@ export class Server { } } -async function handleUnary( +function handleUnary( call: Http2ServerCallStream, handler: UnaryHandler, metadata: Metadata, encoding: string -): Promise { - const request = await call.receiveUnaryMessage(encoding); +): void { + call.receiveUnaryMessage(encoding, (err, request) => { + if (err) { + call.sendError(err) + return + } - if (request === undefined || call.cancelled) { - return; - } + if (request === undefined || call.cancelled) { + return; + } - const emitter = new ServerUnaryCallImpl( - call, - metadata, - request - ); + const emitter = new ServerUnaryCallImpl( + call, + metadata, + request + ); - handler.func( - emitter, - ( - err: ServerErrorResponse | ServerStatusResponse | null, - value?: ResponseType | null, - trailer?: Metadata, - flags?: number - ) => { - call.sendUnaryMessage(err, value, trailer, flags); - } - ); + handler.func( + emitter, + ( + err: ServerErrorResponse | ServerStatusResponse | null, + value?: ResponseType | null, + trailer?: Metadata, + flags?: number + ) => { + call.sendUnaryMessage(err, value, trailer, flags); + } + ); + }); } function handleClientStreaming( @@ -972,26 +1042,31 @@ function handleClientStreaming( handler.func(stream, respond); } -async function handleServerStreaming( +function handleServerStreaming( call: Http2ServerCallStream, handler: ServerStreamingHandler, metadata: Metadata, encoding: string -): Promise { - const request = await call.receiveUnaryMessage(encoding); +): void { + call.receiveUnaryMessage(encoding, (err, request) => { + if (err) { + call.sendError(err) + return + } - if (request === undefined || call.cancelled) { - return; - } + if (request === undefined || call.cancelled) { + return; + } - const stream = new ServerWritableStreamImpl( - call, - metadata, - handler.serialize, - request - ); + const stream = new ServerWritableStreamImpl( + call, + metadata, + handler.serialize, + request + ); - handler.func(stream); + handler.func(stream); + }); } function handleBidiStreaming( From 93de96f490d5683315532ee0bcdbcccff04c5ed3 Mon Sep 17 00:00:00 2001 From: AVVS Date: Wed, 19 Oct 2022 15:25:42 -0700 Subject: [PATCH 2/2] revert: extra close event on stream --- packages/grpc-js/src/server-call.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 84ff7c8ee..6ddfe13b9 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -443,9 +443,6 @@ export class Http2ServerCallStream< metadata: null, }); } - - // to compensate for a fact that cancelled is not always called - this.emit('close'); }); this.stream.on('drain', () => { @@ -777,9 +774,6 @@ export class Http2ServerCallStream< call.emit('cancelled', reason); }); - // to compensate for the fact that cancelled is no longer always called - this.once('close', () => call.emit('close')) - this.once('callEnd', (status) => call.emit('callEnd', status)); }