diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index c03258308..0e1f94d20 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -553,105 +553,99 @@ export class Http2ServerCallStream< return metadata; } - receiveUnaryMessage( - encoding: string, - next: ( - err: Partial | null, - request?: RequestType - ) => void - ): void { - const { stream } = this; + receiveUnaryMessage(encoding: string): Promise { + return new Promise((resolve, reject) => { + const { stream } = this; + + let receivedLength = 0; + + // eslint-disable-next-line @typescript-eslint/no-this-alias + const call = this; + const body: Buffer[] = []; + const limit = this.maxReceiveMessageSize; - let receivedLength = 0; + this.stream.on('data', onData); + this.stream.on('end', onEnd); + this.stream.on('error', onEnd); - // eslint-disable-next-line @typescript-eslint/no-this-alias - const call = this; - const body: Buffer[] = []; - const limit = this.maxReceiveMessageSize; + function onData(chunk: Buffer) { + receivedLength += chunk.byteLength; - stream.on('data', onData); - stream.on('end', onEnd); - stream.on('error', onEnd); + if (limit !== -1 && receivedLength > limit) { + stream.removeListener('data', onData); + stream.removeListener('end', onEnd); + stream.removeListener('error', onEnd); + + reject({ + code: Status.RESOURCE_EXHAUSTED, + details: `Received message larger than max (${receivedLength} vs. ${limit})`, + }); + return; + } - function onData(chunk: Buffer) { - receivedLength += chunk.byteLength; + body.push(chunk); + } - if (limit !== -1 && receivedLength > limit) { + function onEnd(err?: Error) { stream.removeListener('data', onData); stream.removeListener('end', onEnd); stream.removeListener('error', onEnd); - next({ - code: Status.RESOURCE_EXHAUSTED, - details: `Received message larger than max (${receivedLength} vs. ${limit})`, - }); - return; - } - body.push(chunk); - } - - function onEnd(err?: Error) { - stream.removeListener('data', onData); - stream.removeListener('end', onEnd); - stream.removeListener('error', onEnd); + if (err !== undefined) { + reject({ code: Status.INTERNAL, details: err.message }); + return; + } - if (err !== undefined) { - next({ code: Status.INTERNAL, details: err.message }); - return; - } + if (receivedLength === 0) { + reject({ + code: Status.INTERNAL, + details: 'received empty unary message', + }); + return; + } - if (receivedLength === 0) { - next({ - code: Status.INTERNAL, - details: 'received empty unary message', - }); - return; - } + call.emit('receiveMessage'); - call.emit('receiveMessage'); + const requestBytes = Buffer.concat(body, receivedLength); + const compressed = requestBytes.readUInt8(0) === 1; + const compressedMessageEncoding = compressed ? encoding : 'identity'; + const decompressedMessage = call.getDecompressedMessage( + requestBytes, + compressedMessageEncoding + ); - const requestBytes = Buffer.concat(body, receivedLength); - const compressed = requestBytes.readUInt8(0) === 1; - const compressedMessageEncoding = compressed ? encoding : 'identity'; - const decompressedMessage = call.getDecompressedMessage( - requestBytes, - compressedMessageEncoding - ); + if (Buffer.isBuffer(decompressedMessage)) { + resolve( + call.deserializeMessageWithInternalError(decompressedMessage) + ); + return; + } - if (Buffer.isBuffer(decompressedMessage)) { - call.safeDeserializeMessage(decompressedMessage, next); - return; + decompressedMessage.then( + decompressed => + resolve(call.deserializeMessageWithInternalError(decompressed)), + (err: any) => + reject( + err.code + ? err + : { + code: Status.INTERNAL, + details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`, + } + ) + ); } - - decompressedMessage.then( - decompressed => call.safeDeserializeMessage(decompressed, next), - (err: any) => - next( - err.code - ? err - : { - code: Status.INTERNAL, - details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`, - } - ) - ); - } + }); } - private safeDeserializeMessage( - buffer: Buffer, - next: ( - err: Partial | null, - request?: RequestType - ) => void - ) { + private async deserializeMessageWithInternalError(buffer: Buffer) { try { - next(null, this.deserializeMessage(buffer)); + return this.deserializeMessage(buffer); } catch (err) { - next({ + throw { details: getErrorMessage(err), code: Status.INTERNAL, - }); + }; } } diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index b9ad8096d..d1859c395 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -1176,17 +1176,14 @@ export class Server { } } -function handleUnary( +async function handleUnary( call: Http2ServerCallStream, handler: UnaryHandler, metadata: Metadata, encoding: string -): void { - call.receiveUnaryMessage(encoding, (err, request) => { - if (err) { - call.sendError(err); - return; - } +): Promise { + try { + const request = await call.receiveUnaryMessage(encoding); if (request === undefined || call.cancelled) { return; @@ -1209,7 +1206,9 @@ function handleUnary( call.sendUnaryMessage(err, value, trailer, flags); } ); - }); + } catch (err) { + call.sendError(err as ServerErrorResponse) + } } function handleClientStreaming( @@ -1243,17 +1242,14 @@ function handleClientStreaming( handler.func(stream, respond); } -function handleServerStreaming( +async function handleServerStreaming( call: Http2ServerCallStream, handler: ServerStreamingHandler, metadata: Metadata, encoding: string -): void { - call.receiveUnaryMessage(encoding, (err, request) => { - if (err) { - call.sendError(err); - return; - } +): Promise { + try { + const request = await call.receiveUnaryMessage(encoding); if (request === undefined || call.cancelled) { return; @@ -1267,7 +1263,9 @@ function handleServerStreaming( ); handler.func(stream); - }); + } catch (err) { + call.sendError(err as ServerErrorResponse) + } } function handleBidiStreaming(