Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Optimizing buffer allocation for server server and bidi streams #2658

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
81 changes: 54 additions & 27 deletions packages/grpc-js/src/server-interceptors.ts
Expand Up @@ -39,6 +39,11 @@ function trace(text: string) {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
}

interface GrpcFrame {
infoBytes: Buffer;
payload: Buffer;
}

export interface ServerMetadataListener {
(metadata: Metadata, next: (metadata: Metadata) => void): void;
}
Expand Down Expand Up @@ -613,16 +618,15 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
* @param value
* @returns
*/
private serializeMessage(value: any) {
const messageBuffer = this.handler.serialize(value);
const byteLength = messageBuffer.byteLength;
const output = Buffer.allocUnsafe(byteLength + 5);
private serializeMessage(value: any): GrpcFrame {
const payload = this.handler.serialize(value);

const infoBytes = Buffer.allocUnsafe(5);
/* Note: response compression is currently not supported, so this
* compressed bit is always 0. */
output.writeUInt8(0, 0);
output.writeUInt32BE(byteLength, 1);
messageBuffer.copy(output, 5);
return output;
infoBytes.writeUInt8(0, 0);
infoBytes.writeUInt32BE(payload.byteLength, 1);
return { infoBytes, payload };
}

private decompressMessage(
Expand Down Expand Up @@ -740,47 +744,70 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa
};
this.stream.respond(headers, defaultResponseOptions);
}
private createFrameWriteErrorCallback(
callback: () => void
): (err: unknown) => void {
let errored = false;
let written = 0;
return (error: unknown) => {
if (errored) {
return;
}
if (error) {
this.sendStatus({
code: Status.INTERNAL,
details: `Error writing message: ${getErrorMessage(error)}`,
metadata: null,
});
errored = true;
return;
}
written++;
if (written !== 2) {
return;
}
this.callEventTracker?.addMessageSent();
callback();
};
}
sendMessage(message: any, callback: () => void): void {
if (this.checkCancelled()) {
return;
}
let response: Buffer;
let frame: GrpcFrame;
try {
response = this.serializeMessage(message);
frame = this.serializeMessage(message);
} catch (e) {
this.sendStatus({
code: Status.INTERNAL,
details: `Error serializing response: ${getErrorMessage(e)}`,
metadata: null
metadata: null,
});
return;
}

if (
this.maxSendMessageSize !== -1 &&
response.length - 5 > this.maxSendMessageSize
frame.payload.length > this.maxSendMessageSize
) {
this.sendStatus({
code: Status.RESOURCE_EXHAUSTED,
details: `Sent message larger than max (${response.length} vs. ${this.maxSendMessageSize})`,
metadata: null
details: `Sent message larger than max (${frame.payload.length} vs. ${this.maxSendMessageSize})`,
metadata: null,
});
return;
}
this.maybeSendMetadata();
trace('Request to ' + this.handler.path + ' sent data frame of size ' + response.length);
this.stream.write(response, error => {
if (error) {
this.sendStatus({
code: Status.INTERNAL,
details: `Error writing message: ${getErrorMessage(error)}`,
metadata: null
});
return;
}
this.callEventTracker?.addMessageSent();
callback();
});
trace(
'Request to ' +
this.handler.path +
' sent data frame of size ' +
frame.payload.length
);

const cb = this.createFrameWriteErrorCallback(callback);
this.stream.write(frame.infoBytes, cb);
this.stream.write(frame.payload, cb);
}
sendStatus(status: PartialStatusObject): void {
if (this.checkCancelled()) {
Expand Down