Skip to content

Commit

Permalink
Merge pull request #2501 from CedricKassen/master
Browse files Browse the repository at this point in the history
Fix premature leaving of context due to improper Http2ServerCallStream handling
  • Loading branch information
murgatroid99 committed Jul 13, 2023
2 parents fe42d64 + 8ed0a50 commit 9ce6e49
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 94 deletions.
150 changes: 72 additions & 78 deletions packages/grpc-js/src/server-call.ts
Expand Up @@ -553,105 +553,99 @@ export class Http2ServerCallStream<
return metadata;
}

receiveUnaryMessage(
encoding: string,
next: (
err: Partial<ServerStatusResponse> | null,
request?: RequestType
) => void
): void {
const { stream } = this;
receiveUnaryMessage(encoding: string): Promise<RequestType | void> {
return new Promise((resolve, reject) => {
const { stream } = this;

let receivedLength = 0;

// eslint-disable-next-line @typescript-eslint/no-this-alias
const call = this;
const body: Buffer[] = [];
const limit = this.maxReceiveMessageSize;

let receivedLength = 0;
this.stream.on('data', onData);
this.stream.on('end', onEnd);
this.stream.on('error', onEnd);

// eslint-disable-next-line @typescript-eslint/no-this-alias
const call = this;
const body: Buffer[] = [];
const limit = this.maxReceiveMessageSize;
function onData(chunk: Buffer) {
receivedLength += chunk.byteLength;

stream.on('data', onData);
stream.on('end', onEnd);
stream.on('error', onEnd);
if (limit !== -1 && receivedLength > limit) {
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onEnd);

reject({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${receivedLength} vs. ${limit})`,
});
return;
}

function onData(chunk: Buffer) {
receivedLength += chunk.byteLength;
body.push(chunk);
}

if (limit !== -1 && receivedLength > limit) {
function onEnd(err?: Error) {
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onEnd);
next({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${receivedLength} vs. ${limit})`,
});
return;
}

body.push(chunk);
}

function onEnd(err?: Error) {
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onEnd);
if (err !== undefined) {
reject({ code: Status.INTERNAL, details: err.message });
return;
}

if (err !== undefined) {
next({ code: Status.INTERNAL, details: err.message });
return;
}
if (receivedLength === 0) {
reject({
code: Status.INTERNAL,
details: 'received empty unary message',
});
return;
}

if (receivedLength === 0) {
next({
code: Status.INTERNAL,
details: 'received empty unary message',
});
return;
}
call.emit('receiveMessage');

call.emit('receiveMessage');
const requestBytes = Buffer.concat(body, receivedLength);
const compressed = requestBytes.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = call.getDecompressedMessage(
requestBytes,
compressedMessageEncoding
);

const requestBytes = Buffer.concat(body, receivedLength);
const compressed = requestBytes.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = call.getDecompressedMessage(
requestBytes,
compressedMessageEncoding
);
if (Buffer.isBuffer(decompressedMessage)) {
resolve(
call.deserializeMessageWithInternalError(decompressedMessage)
);
return;
}

if (Buffer.isBuffer(decompressedMessage)) {
call.safeDeserializeMessage(decompressedMessage, next);
return;
decompressedMessage.then(
decompressed =>
resolve(call.deserializeMessageWithInternalError(decompressed)),
(err: any) =>
reject(
err.code
? err
: {
code: Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
}
)
);
}

decompressedMessage.then(
decompressed => call.safeDeserializeMessage(decompressed, next),
(err: any) =>
next(
err.code
? err
: {
code: Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
}
)
);
}
});
}

private safeDeserializeMessage(
buffer: Buffer,
next: (
err: Partial<ServerStatusResponse> | null,
request?: RequestType
) => void
) {
private async deserializeMessageWithInternalError(buffer: Buffer) {
try {
next(null, this.deserializeMessage(buffer));
return this.deserializeMessage(buffer);
} catch (err) {
next({
throw {
details: getErrorMessage(err),
code: Status.INTERNAL,
});
};
}
}

Expand Down
30 changes: 14 additions & 16 deletions packages/grpc-js/src/server.ts
Expand Up @@ -1176,17 +1176,14 @@ export class Server {
}
}

function handleUnary<RequestType, ResponseType>(
async function handleUnary<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: UnaryHandler<RequestType, ResponseType>,
metadata: Metadata,
encoding: string
): void {
call.receiveUnaryMessage(encoding, (err, request) => {
if (err) {
call.sendError(err);
return;
}
): Promise<void> {
try {
const request = await call.receiveUnaryMessage(encoding);

if (request === undefined || call.cancelled) {
return;
Expand All @@ -1209,7 +1206,9 @@ function handleUnary<RequestType, ResponseType>(
call.sendUnaryMessage(err, value, trailer, flags);
}
);
});
} catch (err) {
call.sendError(err as ServerErrorResponse)
}
}

function handleClientStreaming<RequestType, ResponseType>(
Expand Down Expand Up @@ -1243,17 +1242,14 @@ function handleClientStreaming<RequestType, ResponseType>(
handler.func(stream, respond);
}

function handleServerStreaming<RequestType, ResponseType>(
async function handleServerStreaming<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: ServerStreamingHandler<RequestType, ResponseType>,
metadata: Metadata,
encoding: string
): void {
call.receiveUnaryMessage(encoding, (err, request) => {
if (err) {
call.sendError(err);
return;
}
): Promise<void> {
try {
const request = await call.receiveUnaryMessage(encoding);

if (request === undefined || call.cancelled) {
return;
Expand All @@ -1267,7 +1263,9 @@ function handleServerStreaming<RequestType, ResponseType>(
);

handler.func(stream);
});
} catch (err) {
call.sendError(err as ServerErrorResponse)
}
}

function handleBidiStreaming<RequestType, ResponseType>(
Expand Down

0 comments on commit 9ce6e49

Please sign in to comment.