Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Allow per-channel request compression from the client and decompression from the server #1952

Merged
merged 26 commits into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7aa5b62
grpc-js: Allow per-channel request compression from the client and de…
b0b3rt Oct 25, 2021
cec7e64
cleanup test file
b0b3rt Oct 25, 2021
b2ebbee
revert mistaken accept-encoding client header change
b0b3rt Oct 26, 2021
16f1356
remove unused compression algorithm key value
b0b3rt Oct 26, 2021
9d70f39
remove copied test proto file and modify npm command
b0b3rt Oct 26, 2021
959503e
rewrite CompressionAlgorithms to use numeric keys and export it
b0b3rt Oct 26, 2021
af01007
have client restore default sendCompression if server doesnt support …
b0b3rt Oct 26, 2021
d68d94a
re-enable NoCompress flag behavior and check Compressed Flag byte on …
b0b3rt Oct 26, 2021
6d75ea1
change encoding type from MetadataValue to string
b0b3rt Oct 26, 2021
21b09e2
separate compression algorithms to avoid circular dependency
b0b3rt Oct 26, 2021
e03c115
log a warning when invalid value for grpc.default_compression_algorit…
b0b3rt Oct 26, 2021
7a31b4a
change test to use write instead of _write
b0b3rt Oct 26, 2021
25a1806
change year in new copyright heade
b0b3rt Oct 26, 2021
d01ff79
Update error message for invalid default_compression_algorithm
b0b3rt Oct 26, 2021
6ba0081
only attempt to use custom compression filter when option value is pr…
b0b3rt Oct 26, 2021
53a2347
only attempt to use custom compression filter when option value is pr…
b0b3rt Oct 26, 2021
c9659b5
persist server supported encoding header across compression filter in…
b0b3rt Oct 29, 2021
47bb8b6
move string split down to where it matters
b0b3rt Oct 29, 2021
9150bdf
default to identity header everywhere instead of leaving it undefined
b0b3rt Oct 30, 2021
5c61a6a
Update packages/grpc-js/src/server-call.ts
b0b3rt Nov 2, 2021
dc9752a
change CompressionAlgorithms to an enum
b0b3rt Nov 2, 2021
deff9d4
Merge branch 'grpc-js_compression_support' of github.com:b0b3rt/grpc-…
b0b3rt Nov 2, 2021
c4d7fab
simplify compression filter handling of server supported encoding hea…
b0b3rt Nov 3, 2021
96ae102
fix path for loading test_service proto
b0b3rt Nov 6, 2021
9cdd36f
Merge branch 'master' into grpc-js_compression_support
murgatroid99 Nov 8, 2021
69428b0
simplify removal of compression prefix bytes
b0b3rt Nov 10, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 5 additions & 3 deletions packages/grpc-js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@
"test": "gulp test",
"check": "gts check src/**/*.ts",
"fix": "gts fix src/*.ts",
"pretest": "npm run generate-types && npm run compile",
"pretest": "npm run generate-types && npm run generate-test-types && npm run compile",
"posttest": "npm run check && madge -c ./build/src",
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs proto/ -O src/generated/ --grpcLib ../index channelz.proto"
"generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs proto/ --include-dirs test/fixtures/ -O src/generated/ --grpcLib ../index channelz.proto",
"generate-test-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --include-dirs test/fixtures/ -O test/generated/ --grpcLib ../../src/index test_service.proto"
},
"dependencies": {
"@grpc/proto-loader": "^0.6.4",
"@types/node": ">=12.12.47"
"@types/node": "16.10.0",
"@types/semver": "^7.3.9"
},
"files": [
"src/**/*.ts",
Expand Down
1 change: 1 addition & 0 deletions packages/grpc-js/src/channel-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export interface ChannelOptions {
'grpc.enable_http_proxy'?: number;
'grpc.http_connect_target'?: string;
'grpc.http_connect_creds'?: string;
'grpc.default_compression_algorithm'?: number;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@murgatroid99 what do you think of changing this from number to keyof typeof CompressionAlgorithms (resulting in 0 | 1 | 2)? That'll probably be more legible to consumers of the library than just exporting CompressionAlgorithms.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems reasonable to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, done. I also moved CompressionAlgorithms into its own file to avoid a circular dependency.

'grpc-node.max_session_memory'?: number;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
Expand Down
3 changes: 1 addition & 2 deletions packages/grpc-js/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
import { mapProxyName } from './http_proxy';
import { GrpcUri, parseUri, uriToString } from './uri-parser';
import { ServerSurfaceCall } from './server-call';
import { SurfaceCall } from './call';
import { Filter } from './filter';

import { ConnectivityState } from './connectivity-state';
Expand Down Expand Up @@ -309,7 +308,7 @@ export class ChannelImplementation implements Channel {
new CallCredentialsFilterFactory(this),
new DeadlineFilterFactory(this),
new MaxMessageSizeFilterFactory(this.options),
new CompressionFilterFactory(this),
new CompressionFilterFactory(this, this.options),
]);
this.trace('Channel constructed with options ' + JSON.stringify(options, undefined, 2));
}
Expand Down
54 changes: 50 additions & 4 deletions packages/grpc-js/src/compression-filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,25 @@

import * as zlib from 'zlib';

import { Call, WriteFlags, WriteObject } from './call-stream';
import { Call, WriteObject, WriteFlags } from './call-stream';
import { Channel } from './channel';
import { BaseFilter, Filter, FilterFactory } from './filter';
import { Metadata, MetadataValue } from './metadata';
import { ChannelOptions } from './channel-options';

export const CompressionAlgorithms = {
0: 'identity',
1: 'deflate',
2: 'gzip'
} as const;

const CompressionAlgorithKeys = new Set(Object.keys(CompressionAlgorithms));

const isCompressionAlgorithmKey = (key: number | undefined): key is keyof typeof CompressionAlgorithms => {
return typeof key === 'number' && CompressionAlgorithKeys.has(key.toString());
}

type CompressionAlgorithm = (typeof CompressionAlgorithms)[keyof typeof CompressionAlgorithms];

abstract class CompressionHandler {
protected abstract compressMessage(message: Buffer): Promise<Buffer>;
Expand Down Expand Up @@ -167,10 +182,29 @@ function getCompressionHandler(compressionName: string): CompressionHandler {
export class CompressionFilter extends BaseFilter implements Filter {
private sendCompression: CompressionHandler = new IdentityHandler();
private receiveCompression: CompressionHandler = new IdentityHandler();
private defaultCompressionAlgorithm: CompressionAlgorithm | undefined;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point this field would more accurately be called currentCompressionAlgorithm, since it's always set to the currently used algorithm, not the default.

Also, please remove | undefined from the type, and use "identity" instead of undefined as the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.


constructor(channelOptions: ChannelOptions) {
super();

const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm'];
if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
this.defaultCompressionAlgorithm = CompressionAlgorithms[compressionAlgorithmKey];
this.sendCompression = getCompressionHandler(this.defaultCompressionAlgorithm);
}
murgatroid99 marked this conversation as resolved.
Show resolved Hide resolved
}

async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> {
const headers: Metadata = await metadata;
headers.set('grpc-accept-encoding', 'identity,deflate,gzip');
headers.set('accept-encoding', 'identity');

if (this.defaultCompressionAlgorithm && ['deflate', 'gzip'].includes(this.defaultCompressionAlgorithm)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the explicit check for "deflate" and "gzip" here. It's effectively duplicating information in the CompressionAlgorithm type and CompressionAlgorithms object. It's perfectly valid to not have a check at all here, and to send "identity` when that is the value. Particularly with the other change, that would allow this code to have no check at all. Alternatively, I would be OK with explicitly excluding "identity", because that is the actual special case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good call, there's no reason to have this check anymore. I think explicitly excluding identity does make sense, if only to save on the bandwidth.

headers.set('grpc-encoding', this.defaultCompressionAlgorithm);
} else {
headers.remove('grpc-encoding');
}

return headers;
}

Expand All @@ -183,6 +217,18 @@ export class CompressionFilter extends BaseFilter implements Filter {
}
}
metadata.remove('grpc-encoding');

/* Check to see if the compression we're using to send messages is supported by the server
* If not, reset the sendCompression filter and have it use the default IdentityHandler */
const serverSupportedEncodingsHeader = metadata.get('grpc-accept-encoding')[0] as string | undefined;
if (serverSupportedEncodingsHeader) {
const serverSupportedEncodings = serverSupportedEncodingsHeader.split(',');

if ((this.sendCompression instanceof DeflateHandler && !serverSupportedEncodings.includes('deflate'))
|| (this.sendCompression instanceof GzipHandler && !serverSupportedEncodings.includes('gzip'))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this whole if statement can be reduced to if (!serverSupportedEncodings.includes(this.currentCompressionAlgorithm)), and it should also set this.currentCompressionAlgorithm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, simplified (and added).

this.sendCompression = new IdentityHandler();
}
}
metadata.remove('grpc-accept-encoding');
return metadata;
}
Expand All @@ -194,7 +240,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
const resolvedMessage: WriteObject = await message;
const compress =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this assignment needs to be expanded out at this point. It's three conditionals and too many negatives, and I think the ordering is a little confusing. I think it would be better for the handler check to be the primary thing, and separate from the flags check. So, something like this:

if (this.sendCompression instanceof IdentityHandler) {
  compress = false;
} else {
  compress = ((resolvedMessage.flags ?? 0) & WriteFlags.NoCompress) === 0;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's a bit too much for a ternary expression. Split it out as suggested.

resolvedMessage.flags === undefined
? false
? !(this.sendCompression instanceof IdentityHandler)
: (resolvedMessage.flags & WriteFlags.NoCompress) === 0;
return {
message: await this.sendCompression.writeMessage(
Expand All @@ -216,8 +262,8 @@ export class CompressionFilter extends BaseFilter implements Filter {

export class CompressionFilterFactory
implements FilterFactory<CompressionFilter> {
constructor(private readonly channel: Channel) {}
constructor(private readonly channel: Channel, private readonly options: ChannelOptions) {}
createFilter(callStream: Call): CompressionFilter {
return new CompressionFilter();
return new CompressionFilter(this.options);
}
}
2 changes: 2 additions & 0 deletions packages/grpc-js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
import { CallCredentials, OAuth2Client } from './call-credentials';
import { Deadline, StatusObject } from './call-stream';
import { Channel, ChannelImplementation } from './channel';
import { CompressionAlgorithms } from './compression-filter';
import { ConnectivityState } from './connectivity-state';
import { ChannelCredentials } from './channel-credentials';
import {
Expand Down Expand Up @@ -124,6 +125,7 @@ export {
Status as status,
ConnectivityState as connectivityState,
Propagate as propagate,
CompressionAlgorithms as compressionAlgorithms
// TODO: Other constants as well
};

Expand Down
88 changes: 77 additions & 11 deletions packages/grpc-js/src/server-call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import { EventEmitter } from 'events';
import * as http2 from 'http2';
import { Duplex, Readable, Writable } from 'stream';
import * as zlib from 'zlib';

import { Deadline, StatusObject } from './call-stream';
import {
Expand All @@ -32,6 +33,7 @@ import { StreamDecoder } from './stream-decoder';
import { ObjectReadable, ObjectWritable } from './object-stream';
import { ChannelOptions } from './channel-options';
import * as logging from './logging';
import { MetadataValue } from '.';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no real reason to represent the encoding as a MetadataValue through the code here. The header key doesn't end with -bin, so the value is just a string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetadataValue is just what's returned from calling .get when we initially pull out the encoding:
const encoding: MetadataValue | undefined = metadata.get('grpc-encoding')[0];

I can coerce it to a string if that'll reduce potential confusion, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could narrow the actual return type in the Metadata definition using string literal types, but that would require a big jump in the minimum compiler version needed to use this library, so I'm not yet sure it's a good idea. In the meantime, I think this is a valid use of type coercion: we know that certain properties of the string guarantee certain narrower types, and the compiler just isn't aware of that.

So, in short, yes, please coerce the encoding to a string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, changed - I suppose it's also part of the spec that the grpc-encoding header needs to be a string value.


const TRACER_NAME = 'server_call';

Expand Down Expand Up @@ -60,7 +62,7 @@ const deadlineUnitsToMs: DeadlineUnitIndexSignature = {
const defaultResponseHeaders = {
// TODO(cjihrig): Remove these encoding headers from the default response
// once compression is integrated.
[GRPC_ACCEPT_ENCODING_HEADER]: 'identity',
[GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip',
[GRPC_ENCODING_HEADER]: 'identity',
[http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK,
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto',
Expand Down Expand Up @@ -136,12 +138,13 @@ export class ServerReadableStreamImpl<RequestType, ResponseType>
constructor(
private call: Http2ServerCallStream<RequestType, ResponseType>,
public metadata: Metadata,
public deserialize: Deserialize<RequestType>
public deserialize: Deserialize<RequestType>,
encoding?: MetadataValue
) {
super({ objectMode: true });
this.cancelled = false;
this.call.setupSurfaceCall(this);
this.call.setupReadable(this);
this.call.setupReadable(this, encoding);
}

_read(size: number) {
Expand Down Expand Up @@ -250,13 +253,14 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType>
private call: Http2ServerCallStream<RequestType, ResponseType>,
public metadata: Metadata,
public serialize: Serialize<ResponseType>,
public deserialize: Deserialize<RequestType>
public deserialize: Deserialize<RequestType>,
encoding?: MetadataValue
) {
super({ objectMode: true });
this.cancelled = false;
this.trailingMetadata = new Metadata();
this.call.setupSurfaceCall(this);
this.call.setupReadable(this);
this.call.setupReadable(this, encoding);

this.on('error', (err) => {
this.call.sendError(err);
Expand Down Expand Up @@ -439,6 +443,47 @@ export class Http2ServerCallStream<
return this.cancelled;
}

private getDecompressedMessage(message: Buffer, encoding?: MetadataValue) {
switch (encoding) {
case 'deflate': {
return new Promise<Buffer | undefined>((resolve, reject) => {
zlib.inflate(message.slice(5), (err, output) => {
if (err) {
this.sendError({
code: Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
});
resolve();
} else {
const joined = Buffer.concat([message.slice(0, 5), output]);
resolve(joined);
}
});
});
}

case 'gzip': {
return new Promise<Buffer | undefined>((resolve, reject) => {
zlib.unzip(message.slice(5), (err, output) => {
if (err) {
this.sendError({
code: Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
});
resolve();
} else {
const joined = Buffer.concat([message.slice(0, 5), output]);
resolve(joined);
}
});
});
}

default:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a separate "identity" case here that does not modify the message, and then the default case will be for unrecognized compression algorithms, and that should end the call with an UNIMPLEMENTED error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return Promise.resolve(message);
}
}

sendMetadata(customMetadata?: Metadata) {
if (this.checkCancelled()) {
return;
Expand Down Expand Up @@ -469,7 +514,7 @@ export class Http2ServerCallStream<
const err = new Error('Invalid deadline') as ServerErrorResponse;
err.code = Status.OUT_OF_RANGE;
this.sendError(err);
return;
return metadata;
}

const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0;
Expand All @@ -484,13 +529,12 @@ export class Http2ServerCallStream<
metadata.remove(http2.constants.HTTP2_HEADER_ACCEPT_ENCODING);
metadata.remove(http2.constants.HTTP2_HEADER_TE);
metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE);
metadata.remove('grpc-encoding');
metadata.remove('grpc-accept-encoding');

return metadata;
}

receiveUnaryMessage(): Promise<RequestType> {
receiveUnaryMessage(encoding?: MetadataValue): Promise<RequestType> {
return new Promise((resolve, reject) => {
const stream = this.stream;
const chunks: Buffer[] = [];
Expand All @@ -516,7 +560,19 @@ export class Http2ServerCallStream<
}

this.emit('receiveMessage');
resolve(this.deserializeMessage(requestBytes));

const compressed = requestBytes.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : undefined;
const decompressedMessage = await this.getDecompressedMessage(requestBytes, compressedMessageEncoding);

// Encountered an error with decompression; it'll already have been propogated back
// Just return early
if (!decompressedMessage) {
resolve();
}
else {
resolve(this.deserializeMessage(decompressedMessage));
}
} catch (err) {
err.code = Status.INTERNAL;
this.sendError(err);
Expand Down Expand Up @@ -673,7 +729,8 @@ export class Http2ServerCallStream<
setupReadable(
readable:
| ServerReadableStream<RequestType, ResponseType>
| ServerDuplexStream<RequestType, ResponseType>
| ServerDuplexStream<RequestType, ResponseType>,
encoding?: MetadataValue
) {
const decoder = new StreamDecoder();

Expand All @@ -692,7 +749,16 @@ export class Http2ServerCallStream<
return;
}
this.emit('receiveMessage');
this.pushOrBufferMessage(readable, message);

const compressed = message.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : undefined;
const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding);

// Encountered an error with decompression; it'll already have been propogated back
// Just return early
if (!decompressedMessage) return;

this.pushOrBufferMessage(readable, decompressedMessage);
}
});

Expand Down