diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index d7151eb6f..488e35fc8 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -97,6 +97,10 @@ export interface StatusObject { metadata: Metadata; } +export type PartialStatusObject = Pick & { + metadata: Metadata | null; +} + export const enum WriteFlags { BufferHint = 1, NoCompress = 2, diff --git a/packages/grpc-js/src/generated/grpc/channelz/v1/Channelz.ts b/packages/grpc-js/src/generated/grpc/channelz/v1/Channelz.ts index ace712454..4c8c18aa7 100644 --- a/packages/grpc-js/src/generated/grpc/channelz/v1/Channelz.ts +++ b/packages/grpc-js/src/generated/grpc/channelz/v1/Channelz.ts @@ -25,102 +25,102 @@ export interface ChannelzClient extends grpc.Client { /** * Returns a single Channel, or else a NOT_FOUND code. */ - GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall; - GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall; - GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall; - GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall; + GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall; + GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall; + GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall; + GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall; /** * Returns a single Server, or else a NOT_FOUND code. */ - GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; - GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; - GetServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; - GetServer(argument: _grpc_channelz_v1_GetServerRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; + GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; + GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; + GetServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; + GetServer(argument: _grpc_channelz_v1_GetServerRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; /** * Returns a single Server, or else a NOT_FOUND code. */ - getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; - getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; - getServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; - getServer(argument: _grpc_channelz_v1_GetServerRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; + getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; + getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; + getServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; + getServer(argument: _grpc_channelz_v1_GetServerRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall; /** * Gets all server sockets that exist in the process. */ - GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; - GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; - GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; - GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; + GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; + GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; + GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; + GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; /** * Gets all server sockets that exist in the process. */ - getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; - getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; - getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; - getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; + getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; + getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; + getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; + getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall; /** * Gets all servers that exist in the process. */ - GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; - GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; - GetServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; - GetServers(argument: _grpc_channelz_v1_GetServersRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; + GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; + GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; + GetServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; + GetServers(argument: _grpc_channelz_v1_GetServersRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; /** * Gets all servers that exist in the process. */ - getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; - getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; - getServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; - getServers(argument: _grpc_channelz_v1_GetServersRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; + getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; + getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; + getServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; + getServers(argument: _grpc_channelz_v1_GetServersRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall; /** * Returns a single Socket or else a NOT_FOUND code. */ - GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; - GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; - GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; - GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; + GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; + GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; + GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; + GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; /** * Returns a single Socket or else a NOT_FOUND code. */ - getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; - getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; - getSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; - getSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; + getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; + getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; + getSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; + getSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall; /** * Returns a single Subchannel, or else a NOT_FOUND code. */ - GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; - GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; - GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; - GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; + GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; + GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; + GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; + GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; /** * Returns a single Subchannel, or else a NOT_FOUND code. */ - getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; - getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; - getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; - getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; + getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; + getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; + getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; + getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall; /** * Gets all root channels (i.e. channels the application has directly * created). This does not include subchannels nor non-top level channels. */ - GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; - GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; - GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; - GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; + GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; + GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; + GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; + GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; /** * Gets all root channels (i.e. channels the application has directly * created). This does not include subchannels nor non-top level channels. */ - getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; - getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; - getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; - getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; + getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; + getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; + getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; + getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall; } diff --git a/packages/grpc-js/src/metadata.ts b/packages/grpc-js/src/metadata.ts index 04db642ef..376cb491a 100644 --- a/packages/grpc-js/src/metadata.ts +++ b/packages/grpc-js/src/metadata.ts @@ -48,13 +48,14 @@ function validate(key: string, value?: MetadataValue): void { if (!isLegalKey(key)) { throw new Error('Metadata key "' + key + '" contains illegal characters'); } + if (value !== null && value !== undefined) { if (isBinaryKey(key)) { - if (!(value instanceof Buffer)) { + if (!Buffer.isBuffer(value)) { throw new Error("keys that end with '-bin' must have Buffer values"); } } else { - if (value instanceof Buffer) { + if (Buffer.isBuffer(value)) { throw new Error( "keys that don't end with '-bin' must have String values" ); @@ -88,12 +89,8 @@ export class Metadata { protected internalRepr: MetadataObject = new Map(); private options: MetadataOptions; - constructor(options?: MetadataOptions) { - if (options === undefined) { - this.options = {}; - } else { - this.options = options; - } + constructor(options: MetadataOptions = {}) { + this.options = options; } /** @@ -120,9 +117,7 @@ export class Metadata { key = normalizeKey(key); validate(key, value); - const existingValue: MetadataValue[] | undefined = this.internalRepr.get( - key - ); + const existingValue: MetadataValue[] | undefined = this.internalRepr.get(key); if (existingValue === undefined) { this.internalRepr.set(key, [value]); @@ -137,7 +132,7 @@ export class Metadata { */ remove(key: string): void { key = normalizeKey(key); - validate(key); + // validate(key); this.internalRepr.delete(key); } @@ -148,7 +143,7 @@ export class Metadata { */ get(key: string): MetadataValue[] { key = normalizeKey(key); - validate(key); + // validate(key); return this.internalRepr.get(key) || []; } @@ -160,12 +155,12 @@ export class Metadata { getMap(): { [key: string]: MetadataValue } { const result: { [key: string]: MetadataValue } = {}; - this.internalRepr.forEach((values, key) => { + for (const [key, values] of this.internalRepr) { if (values.length > 0) { const v = values[0]; - result[key] = v instanceof Buffer ? v.slice() : v; + result[key] = Buffer.isBuffer(v) ? Buffer.from(v) : v; } - }); + } return result; } @@ -177,9 +172,9 @@ export class Metadata { const newMetadata = new Metadata(this.options); const newInternalRepr = newMetadata.internalRepr; - this.internalRepr.forEach((value, key) => { + for (const [key, value] of this.internalRepr) { const clonedValue: MetadataValue[] = value.map((v) => { - if (v instanceof Buffer) { + if (Buffer.isBuffer(v)) { return Buffer.from(v); } else { return v; @@ -187,7 +182,7 @@ export class Metadata { }); newInternalRepr.set(key, clonedValue); - }); + } return newMetadata; } @@ -200,13 +195,13 @@ export class Metadata { * @param other A Metadata object. */ merge(other: Metadata): void { - other.internalRepr.forEach((values, key) => { + for (const [key, values] of other.internalRepr) { const mergedValue: MetadataValue[] = ( this.internalRepr.get(key) || [] ).concat(values); this.internalRepr.set(key, mergedValue); - }); + } } setOptions(options: MetadataOptions) { @@ -223,17 +218,13 @@ export class Metadata { toHttp2Headers(): http2.OutgoingHttpHeaders { // NOTE: Node <8.9 formats http2 headers incorrectly. const result: http2.OutgoingHttpHeaders = {}; - this.internalRepr.forEach((values, key) => { + + for (const [key, values] of this.internalRepr) { // We assume that the user's interaction with this object is limited to // through its public API (i.e. keys and values are already validated). - result[key] = values.map((value) => { - if (value instanceof Buffer) { - return value.toString('base64'); - } else { - return value; - } - }); - }); + result[key] = values.map(bufToString); + } + return result; } @@ -248,7 +239,7 @@ export class Metadata { */ toJSON() { const result: { [key: string]: MetadataValue[] } = {}; - for (const [key, values] of this.internalRepr.entries()) { + for (const [key, values] of this.internalRepr) { result[key] = values; } return result; @@ -261,10 +252,10 @@ export class Metadata { */ static fromHttp2Headers(headers: http2.IncomingHttpHeaders): Metadata { const result = new Metadata(); - Object.keys(headers).forEach((key) => { + for (const key of Object.keys(headers)) { // Reserved headers (beginning with `:`) are not valid keys. if (key.charAt(0) === ':') { - return; + continue; } const values = headers[key]; @@ -297,7 +288,12 @@ export class Metadata { const message = `Failed to add metadata entry ${key}: ${values}. ${error.message}. For more information see https://github.com/grpc/grpc-node/issues/1173`; log(LogVerbosity.ERROR, message); } - }); + } + return result; } } + +const bufToString = (val: string | Buffer): string => { + return Buffer.isBuffer(val) ? val.toString('base64') : val +}; diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 1f7f3eb04..6ddfe13b9 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -19,8 +19,9 @@ import { EventEmitter } from 'events'; import * as http2 from 'http2'; import { Duplex, Readable, Writable } from 'stream'; import * as zlib from 'zlib'; +import { promisify } from 'util'; -import { Deadline, StatusObject } from './call-stream'; +import { Deadline, StatusObject, PartialStatusObject } from './call-stream'; import { Status, DEFAULT_MAX_SEND_MESSAGE_LENGTH, @@ -35,6 +36,8 @@ import { ChannelOptions } from './channel-options'; import * as logging from './logging'; const TRACER_NAME = 'server_call'; +const unzip = promisify(zlib.unzip); +const inflate = promisify(zlib.inflate); function trace(text: string): void { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); @@ -86,25 +89,22 @@ export type ServerSurfaceCall = { export type ServerUnaryCall = ServerSurfaceCall & { request: RequestType; }; -export type ServerReadableStream< - RequestType, - ResponseType -> = ServerSurfaceCall & ObjectReadable; -export type ServerWritableStream< - RequestType, - ResponseType -> = ServerSurfaceCall & - ObjectWritable & { - request: RequestType; - end: (metadata?: Metadata) => void; - }; +export type ServerReadableStream = + ServerSurfaceCall & ObjectReadable; +export type ServerWritableStream = + ServerSurfaceCall & + ObjectWritable & { + request: RequestType; + end: (metadata?: Metadata) => void; + }; export type ServerDuplexStream = ServerSurfaceCall & ObjectReadable & ObjectWritable & { end: (metadata?: Metadata) => void }; export class ServerUnaryCallImpl extends EventEmitter - implements ServerUnaryCall { + implements ServerUnaryCall +{ cancelled: boolean; constructor( @@ -136,7 +136,8 @@ export class ServerUnaryCallImpl export class ServerReadableStreamImpl extends Readable - implements ServerReadableStream { + implements ServerReadableStream +{ cancelled: boolean; constructor( @@ -178,7 +179,8 @@ export class ServerReadableStreamImpl export class ServerWritableStreamImpl extends Writable - implements ServerWritableStream { + implements ServerWritableStream +{ cancelled: boolean; private trailingMetadata: Metadata; @@ -257,7 +259,8 @@ export class ServerWritableStreamImpl export class ServerDuplexStreamImpl extends Duplex - implements ServerDuplexStream { + implements ServerDuplexStream +{ cancelled: boolean; private trailingMetadata: Metadata; @@ -395,7 +398,8 @@ export class Http2ServerCallStream< ResponseType > extends EventEmitter { cancelled = false; - deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0); + deadlineTimer: NodeJS.Timer | null = null; + private statusSent = false; private deadline: Deadline = Infinity; private wantTrailers = false; private metadataSent = false; @@ -428,10 +432,17 @@ export class Http2ServerCallStream< ' stream closed with rstCode ' + this.stream.rstCode ); - this.cancelled = true; - this.emit('cancelled', 'cancelled'); - this.emit('streamEnd', false); - this.sendStatus({code: Status.CANCELLED, details: 'Cancelled by client', metadata: new Metadata()}); + + if (!this.statusSent) { + this.cancelled = true; + this.emit('cancelled', 'cancelled'); + this.emit('streamEnd', false); + this.sendStatus({ + code: Status.CANCELLED, + details: 'Cancelled by client', + metadata: null, + }); + } }); this.stream.on('drain', () => { @@ -444,9 +455,6 @@ export class Http2ServerCallStream< if ('grpc.max_receive_message_length' in options) { this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!; } - - // Clear noop timer - clearTimeout(this.deadlineTimer); } private checkCancelled(): boolean { @@ -458,52 +466,22 @@ export class Http2ServerCallStream< return this.cancelled; } - private getDecompressedMessage(message: Buffer, encoding: string) { - switch (encoding) { - case 'deflate': { - return new Promise((resolve, reject) => { - zlib.inflate(message.slice(5), (err, output) => { - if (err) { - this.sendError({ - code: Status.INTERNAL, - details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`, - }); - resolve(); - } else { - resolve(output); - } - }); - }); - } - - case 'gzip': { - return new Promise((resolve, reject) => { - zlib.unzip(message.slice(5), (err, output) => { - if (err) { - this.sendError({ - code: Status.INTERNAL, - details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`, - }); - resolve(); - } else { - resolve(output); - } - }); - }); - } - - case 'identity': { - return Promise.resolve(message.slice(5)); - } - - default: { - this.sendError({ - code: Status.UNIMPLEMENTED, - details: `Received message compressed with unsupported encoding "${encoding}"`, - }); - return Promise.resolve(); - } + private getDecompressedMessage( + message: Buffer, + encoding: string + ): Buffer | Promise { + if (encoding === 'deflate') { + return inflate(message.subarray(5)); + } else if (encoding === 'gzip') { + return unzip(message.subarray(5)); + } else if (encoding === 'identity') { + return message.subarray(5); } + + return Promise.reject({ + code: Status.UNIMPLEMENTED, + details: `Received message compressed with unsupported encoding "${encoding}"`, + }); } sendMetadata(customMetadata?: Metadata) { @@ -518,13 +496,22 @@ export class Http2ServerCallStream< this.metadataSent = true; const custom = customMetadata ? customMetadata.toHttp2Headers() : null; // TODO(cjihrig): Include compression headers. - const headers = Object.assign({}, defaultResponseHeaders, custom); + const headers = { ...defaultResponseHeaders, ...custom }; this.stream.respond(headers, defaultResponseOptions); } receiveMetadata(headers: http2.IncomingHttpHeaders) { const metadata = Metadata.fromHttp2Headers(headers); + if (logging.isTracerEnabled(TRACER_NAME)) { + trace( + 'Request to ' + + this.handler.path + + ' received headers ' + + JSON.stringify(metadata.toJSON()) + ); + } + // TODO(cjihrig): Receive compression metadata. const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER); @@ -556,52 +543,95 @@ export class Http2ServerCallStream< return metadata; } - receiveUnaryMessage(encoding: string): Promise { - return new Promise((resolve, reject) => { - const stream = this.stream; - const chunks: Buffer[] = []; - let totalLength = 0; + receiveUnaryMessage( + encoding: string, + next: ( + err: Partial | null, + request?: RequestType + ) => void + ): void { + const { stream } = this; + + let receivedLength = 0; + const call = this; + const body: Buffer[] = []; + const limit = this.maxReceiveMessageSize; + + stream.on('data', onData); + stream.on('end', onEnd); + stream.on('error', onEnd); + + function onData(chunk: Buffer) { + receivedLength += chunk.byteLength; + + if (limit !== -1 && receivedLength > limit) { + stream.removeListener('data', onData); + stream.removeListener('end', onEnd); + stream.removeListener('error', onEnd); + next({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message larger than max (${receivedLength} vs. ${limit})`, + }); + return; + } - stream.on('data', (data: Buffer) => { - chunks.push(data); - totalLength += data.byteLength; - }); + body.push(chunk); + } - stream.once('end', async () => { - try { - const requestBytes = Buffer.concat(chunks, totalLength); - if ( - this.maxReceiveMessageSize !== -1 && - requestBytes.length > this.maxReceiveMessageSize - ) { - this.sendError({ - code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${requestBytes.length} vs. ${this.maxReceiveMessageSize})`, - }); - resolve(); - } - - this.emit('receiveMessage'); - - const compressed = requestBytes.readUInt8(0) === 1; - const compressedMessageEncoding = compressed ? encoding : 'identity'; - const decompressedMessage = await this.getDecompressedMessage(requestBytes, compressedMessageEncoding); - - // Encountered an error with decompression; it'll already have been propogated back - // Just return early - if (!decompressedMessage) { - resolve(); - } - else { - resolve(this.deserializeMessage(decompressedMessage)); - } - } catch (err) { - err.code = Status.INTERNAL; - this.sendError(err); - resolve(); - } - }); - }); + function onEnd(err?: Error) { + stream.removeListener('data', onData); + stream.removeListener('end', onEnd); + stream.removeListener('error', onEnd); + + if (err !== undefined) { + next({ code: Status.INTERNAL, details: err.message }); + return; + } + + if (receivedLength === 0) { + next({ code: Status.INTERNAL, details: 'received empty unary message' }) + return; + } + + call.emit('receiveMessage'); + + const requestBytes = Buffer.concat(body, receivedLength); + const compressed = requestBytes.readUInt8(0) === 1; + const compressedMessageEncoding = compressed ? encoding : 'identity'; + const decompressedMessage = call.getDecompressedMessage( + requestBytes, + compressedMessageEncoding + ); + + if (Buffer.isBuffer(decompressedMessage)) { + call.safeDeserializeMessage(decompressedMessage, next); + return; + } + + decompressedMessage.then( + (decompressed) => call.safeDeserializeMessage(decompressed, next), + (err: any) => next( + err.code + ? err + : { + code: Status.INTERNAL, + details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`, + } + ) + ) + } + } + + private safeDeserializeMessage( + buffer: Buffer, + next: (err: Partial | null, request?: RequestType) => void + ) { + try { + next(null, this.deserializeMessage(buffer)); + } catch (err) { + err.code = Status.INTERNAL; + next(err); + } } serializeMessage(value: ResponseType) { @@ -623,18 +653,19 @@ export class Http2ServerCallStream< async sendUnaryMessage( err: ServerErrorResponse | ServerStatusResponse | null, value?: ResponseType | null, - metadata?: Metadata, + metadata?: Metadata | null, flags?: number ) { if (this.checkCancelled()) { return; } - if (!metadata) { - metadata = new Metadata(); + + if (metadata === undefined) { + metadata = null; } if (err) { - if (!Object.prototype.hasOwnProperty.call(err, 'metadata')) { + if (!Object.prototype.hasOwnProperty.call(err, 'metadata') && metadata) { err.metadata = metadata; } this.sendError(err); @@ -652,7 +683,7 @@ export class Http2ServerCallStream< } } - sendStatus(statusObj: StatusObject) { + sendStatus(statusObj: PartialStatusObject) { this.emit('callEnd', statusObj.code); this.emit('streamEnd', statusObj.code === Status.OK); if (this.checkCancelled()) { @@ -668,20 +699,19 @@ export class Http2ServerCallStream< statusObj.details ); - clearTimeout(this.deadlineTimer); + if (this.deadlineTimer) clearTimeout(this.deadlineTimer); if (!this.wantTrailers) { this.wantTrailers = true; this.stream.once('wantTrailers', () => { - const trailersToSend = Object.assign( - { - [GRPC_STATUS_HEADER]: statusObj.code, - [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details as string), - }, - statusObj.metadata.toHttp2Headers() - ); + const trailersToSend = { + [GRPC_STATUS_HEADER]: statusObj.code, + [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details), + ...statusObj.metadata?.toHttp2Headers(), + }; this.stream.sendTrailers(trailersToSend); + this.statusSent = true; }); this.sendMetadata(); this.stream.end(); @@ -689,13 +719,13 @@ export class Http2ServerCallStream< } sendError(error: ServerErrorResponse | ServerStatusResponse) { - const status: StatusObject = { + const status: PartialStatusObject = { code: Status.UNKNOWN, details: 'message' in error ? error.message : 'Unknown Error', metadata: 'metadata' in error && error.metadata !== undefined ? error.metadata - : new Metadata(), + : null, }; if ( @@ -766,7 +796,7 @@ export class Http2ServerCallStream< pushedEnd = true; this.pushOrBufferMessage(readable, null); } - } + }; this.stream.on('data', async (data: Buffer) => { const messages = decoder.write(data); @@ -788,12 +818,15 @@ export class Http2ServerCallStream< const compressed = message.readUInt8(0) === 1; const compressedMessageEncoding = compressed ? encoding : 'identity'; - const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding); + const decompressedMessage = await this.getDecompressedMessage( + message, + compressedMessageEncoding + ); // Encountered an error with decompression; it'll already have been propogated back // Just return early if (!decompressedMessage) return; - + this.pushOrBufferMessage(readable, decompressedMessage); } pendingMessageProcessing = false; diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 499fde4a3..2e89f459b 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -62,6 +62,10 @@ import { parseUri } from './uri-parser'; import { ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzServer, registerChannelzSocket, ServerInfo, ServerRef, SocketInfo, SocketRef, TlsInfo, unregisterChannelzRef } from './channelz'; import { CipherNameAndProtocol, TLSSocket } from 'tls'; +const { + HTTP2_HEADER_PATH +} = http2.constants + const TRACER_NAME = 'server'; interface BindResult { @@ -77,7 +81,6 @@ function getUnimplementedStatusResponse( return { code: Status.UNIMPLEMENTED, details: `The server does not implement the method ${methodName}`, - metadata: new Metadata(), }; } @@ -147,6 +150,7 @@ export class Server { private sessions = new Map(); private started = false; private options: ChannelOptions; + private serverAddressString: string = 'null' // Channelz Info private readonly channelzEnabled: boolean = true; @@ -165,6 +169,7 @@ export class Server { if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Server created'); } + this.trace('Server constructed'); } @@ -730,150 +735,210 @@ export class Server { return this.channelzRef; } - private _setupHandlers( - http2Server: http2.Http2Server | http2.Http2SecureServer - ): void { - if (http2Server === null) { - return; + private _verifyContentType(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders): boolean { + const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE]; + + if ( + typeof contentType !== 'string' || + !contentType.startsWith('application/grpc') + ) { + stream.respond( + { + [http2.constants.HTTP2_HEADER_STATUS]: + http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, + }, + { endStream: true } + ); + return false } - http2Server.on( - 'stream', - (stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => { - const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session); - if (this.channelzEnabled) { - this.callTracker.addCallStarted(); - channelzSessionInfo?.streamTracker.addCallStarted(); - } - const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE]; - - if ( - typeof contentType !== 'string' || - !contentType.startsWith('application/grpc') - ) { - stream.respond( - { - [http2.constants.HTTP2_HEADER_STATUS]: - http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, - }, - { endStream: true } - ); - this.callTracker.addCallFailed(); - if (this.channelzEnabled) { - channelzSessionInfo?.streamTracker.addCallFailed(); - } - return; - } + return true + } - let call: Http2ServerCallStream | null = null; + private _retrieveHandler(headers: http2.IncomingHttpHeaders): Handler { + const path = headers[HTTP2_HEADER_PATH] as string - try { - const path = headers[http2.constants.HTTP2_HEADER_PATH] as string; - const serverAddress = http2Server.address(); - let serverAddressString = 'null'; - if (serverAddress) { - if (typeof serverAddress === 'string') { - serverAddressString = serverAddress; - } else { - serverAddressString = - serverAddress.address + ':' + serverAddress.port; - } - } - this.trace( - 'Received call to method ' + - path + - ' at address ' + - serverAddressString - ); - const handler = this.handlers.get(path); + this.trace( + 'Received call to method ' + + path + + ' at address ' + + this.serverAddressString + ); - if (handler === undefined) { - this.trace( - 'No handler registered for method ' + - path + - '. Sending UNIMPLEMENTED status.' - ); - throw getUnimplementedStatusResponse(path); - } + const handler = this.handlers.get(path); - call = new Http2ServerCallStream(stream, handler, this.options); - call.once('callEnd', (code: Status) => { - if (code === Status.OK) { - this.callTracker.addCallSucceeded(); - } else { - this.callTracker.addCallFailed(); - } - }); - if (this.channelzEnabled && channelzSessionInfo) { - call.once('streamEnd', (success: boolean) => { - if (success) { - channelzSessionInfo.streamTracker.addCallSucceeded(); - } else { - channelzSessionInfo.streamTracker.addCallFailed(); - } - }); - call.on('sendMessage', () => { - channelzSessionInfo.messagesSent += 1; - channelzSessionInfo.lastMessageSentTimestamp = new Date(); - }); - call.on('receiveMessage', () => { - channelzSessionInfo.messagesReceived += 1; - channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); - }); - } - const metadata = call.receiveMetadata(headers); - const encoding = (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity'; - metadata.remove('grpc-encoding'); - - switch (handler.type) { - case 'unary': - handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding); - break; - case 'clientStream': - handleClientStreaming( - call, - handler as UntypedClientStreamingHandler, - metadata, - encoding - ); - break; - case 'serverStream': - handleServerStreaming( - call, - handler as UntypedServerStreamingHandler, - metadata, - encoding - ); - break; - case 'bidi': - handleBidiStreaming( - call, - handler as UntypedBidiStreamingHandler, - metadata, - encoding - ); - break; - default: - throw new Error(`Unknown handler type: ${handler.type}`); - } - } catch (err) { - if (!call) { - call = new Http2ServerCallStream(stream, null!, this.options); - if (this.channelzEnabled) { - this.callTracker.addCallFailed(); - channelzSessionInfo?.streamTracker.addCallFailed() - } - } + if (handler === undefined) { + this.trace( + 'No handler registered for method ' + + path + + '. Sending UNIMPLEMENTED status.' + ); + throw getUnimplementedStatusResponse(path); + } - if (err.code === undefined) { - err.code = Status.INTERNAL; - } + return handler + } + + private _respondWithError>( + err: T, + stream: http2.ServerHttp2Stream, + channelzSessionInfo: ChannelzSessionInfo | null = null + ) { + const call = new Http2ServerCallStream(stream, null!, this.options); + + if (err.code === undefined) { + err.code = Status.INTERNAL; + } - call.sendError(err); + if (this.channelzEnabled) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed() + } + + call.sendError(err); + } + + private _channelzHandler(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) { + const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session); + + this.callTracker.addCallStarted(); + channelzSessionInfo?.streamTracker.addCallStarted(); + + if (!this._verifyContentType(stream, headers)) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); + return + } + + let handler: Handler + try { + handler = this._retrieveHandler(headers) + } catch (err) { + this._respondWithError(err, stream, channelzSessionInfo) + return + } + + const call = new Http2ServerCallStream(stream, handler, this.options); + + call.once('callEnd', (code: Status) => { + if (code === Status.OK) { + this.callTracker.addCallSucceeded(); + } else { + this.callTracker.addCallFailed(); + } + }); + + if (channelzSessionInfo) { + call.once('streamEnd', (success: boolean) => { + if (success) { + channelzSessionInfo.streamTracker.addCallSucceeded(); + } else { + channelzSessionInfo.streamTracker.addCallFailed(); } + }); + call.on('sendMessage', () => { + channelzSessionInfo.messagesSent += 1; + channelzSessionInfo.lastMessageSentTimestamp = new Date(); + }); + call.on('receiveMessage', () => { + channelzSessionInfo.messagesReceived += 1; + channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); + }); + } + + if (!this._runHandlerForCall(call, handler, headers)) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed() + + call.sendError({ + code: Status.INTERNAL, + details: `Unknown handler type: ${handler.type}` + }); + } + } + + private _streamHandler(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) { + if (this._verifyContentType(stream, headers) !== true) { + return + } + + let handler: Handler + try { + handler = this._retrieveHandler(headers) + } catch (err) { + this._respondWithError(err, stream, null) + return + } + + const call = new Http2ServerCallStream(stream, handler, this.options) + if (!this._runHandlerForCall(call, handler, headers)) { + call.sendError({ + code: Status.INTERNAL, + details: `Unknown handler type: ${handler.type}` + }); + } + } + + private _runHandlerForCall(call: Http2ServerCallStream, handler: Handler, headers: http2.IncomingHttpHeaders): boolean { + const metadata = call.receiveMetadata(headers); + const encoding = (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity'; + metadata.remove('grpc-encoding'); + + const { type } = handler + if (type === 'unary') { + handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding); + } else if (type === 'clientStream') { + handleClientStreaming( + call, + handler as UntypedClientStreamingHandler, + metadata, + encoding + ); + } else if (type === 'serverStream') { + handleServerStreaming( + call, + handler as UntypedServerStreamingHandler, + metadata, + encoding + ); + } else if (type === 'bidi') { + handleBidiStreaming( + call, + handler as UntypedBidiStreamingHandler, + metadata, + encoding + ); + } else { + return false + } + + return true + } + + private _setupHandlers( + http2Server: http2.Http2Server | http2.Http2SecureServer + ): void { + if (http2Server === null) { + return; + } + + const serverAddress = http2Server.address(); + let serverAddressString = 'null' + if (serverAddress) { + if (typeof serverAddress === 'string') { + serverAddressString = serverAddress + } else { + serverAddressString = + serverAddress.address + ':' + serverAddress.port } - ); + } + this.serverAddressString = serverAddressString + + const handler = this.channelzEnabled + ? this._channelzHandler + : this._streamHandler + http2Server.on('stream', handler.bind(this)) http2Server.on('session', (session) => { if (!this.started) { session.destroy(); @@ -910,35 +975,40 @@ export class Server { } } -async function handleUnary( +function handleUnary( call: Http2ServerCallStream, handler: UnaryHandler, metadata: Metadata, encoding: string -): Promise { - const request = await call.receiveUnaryMessage(encoding); +): void { + call.receiveUnaryMessage(encoding, (err, request) => { + if (err) { + call.sendError(err) + return + } - if (request === undefined || call.cancelled) { - return; - } + if (request === undefined || call.cancelled) { + return; + } - const emitter = new ServerUnaryCallImpl( - call, - metadata, - request - ); + const emitter = new ServerUnaryCallImpl( + call, + metadata, + request + ); - handler.func( - emitter, - ( - err: ServerErrorResponse | ServerStatusResponse | null, - value?: ResponseType | null, - trailer?: Metadata, - flags?: number - ) => { - call.sendUnaryMessage(err, value, trailer, flags); - } - ); + handler.func( + emitter, + ( + err: ServerErrorResponse | ServerStatusResponse | null, + value?: ResponseType | null, + trailer?: Metadata, + flags?: number + ) => { + call.sendUnaryMessage(err, value, trailer, flags); + } + ); + }); } function handleClientStreaming( @@ -972,26 +1042,31 @@ function handleClientStreaming( handler.func(stream, respond); } -async function handleServerStreaming( +function handleServerStreaming( call: Http2ServerCallStream, handler: ServerStreamingHandler, metadata: Metadata, encoding: string -): Promise { - const request = await call.receiveUnaryMessage(encoding); +): void { + call.receiveUnaryMessage(encoding, (err, request) => { + if (err) { + call.sendError(err) + return + } - if (request === undefined || call.cancelled) { - return; - } + if (request === undefined || call.cancelled) { + return; + } - const stream = new ServerWritableStreamImpl( - call, - metadata, - handler.serialize, - request - ); + const stream = new ServerWritableStreamImpl( + call, + metadata, + handler.serialize, + request + ); - handler.func(stream); + handler.func(stream); + }); } function handleBidiStreaming(