-
Notifications
You must be signed in to change notification settings - Fork 617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
grpc-js: Allow per-channel request compression from the client and decompression from the server #1952
grpc-js: Allow per-channel request compression from the client and decompression from the server #1952
Changes from 13 commits
7aa5b62
cec7e64
b2ebbee
16f1356
9d70f39
959503e
af01007
d68d94a
6d75ea1
21b09e2
e03c115
7a31b4a
25a1806
d01ff79
6ba0081
53a2347
c9659b5
47bb8b6
9150bdf
5c61a6a
dc9752a
deff9d4
c4d7fab
96ae102
9cdd36f
69428b0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
/* | ||
* Copyright 2021 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
export const CompressionAlgorithms = { | ||
0: 'identity', | ||
1: 'deflate', | ||
2: 'gzip' | ||
} as const; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,11 +17,23 @@ | |
|
||
import * as zlib from 'zlib'; | ||
|
||
import { Call, WriteFlags, WriteObject } from './call-stream'; | ||
import { Call, WriteObject, WriteFlags } from './call-stream'; | ||
import { Channel } from './channel'; | ||
import { ChannelOptions } from './channel-options'; | ||
import { CompressionAlgorithms } from './compression-algorithms'; | ||
import { LogVerbosity } from './constants'; | ||
import { BaseFilter, Filter, FilterFactory } from './filter'; | ||
import * as logging from './logging'; | ||
import { Metadata, MetadataValue } from './metadata'; | ||
|
||
const CompressionAlgorithKeys = new Set(Object.keys(CompressionAlgorithms)); | ||
|
||
const isCompressionAlgorithmKey = (key: number | undefined): key is keyof typeof CompressionAlgorithms => { | ||
return typeof key === 'number' && CompressionAlgorithKeys.has(key.toString()); | ||
} | ||
|
||
type CompressionAlgorithm = (typeof CompressionAlgorithms)[keyof typeof CompressionAlgorithms]; | ||
|
||
abstract class CompressionHandler { | ||
protected abstract compressMessage(message: Buffer): Promise<Buffer>; | ||
protected abstract decompressMessage(data: Buffer): Promise<Buffer>; | ||
|
@@ -167,10 +179,31 @@ function getCompressionHandler(compressionName: string): CompressionHandler { | |
export class CompressionFilter extends BaseFilter implements Filter { | ||
private sendCompression: CompressionHandler = new IdentityHandler(); | ||
private receiveCompression: CompressionHandler = new IdentityHandler(); | ||
private defaultCompressionAlgorithm: CompressionAlgorithm | undefined; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At this point this field would more accurately be called Also, please remove There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. |
||
|
||
constructor(channelOptions: ChannelOptions) { | ||
super(); | ||
|
||
const compressionAlgorithmKey = channelOptions['grpc.default_compression_algorithm']; | ||
if (isCompressionAlgorithmKey(compressionAlgorithmKey)) { | ||
this.defaultCompressionAlgorithm = CompressionAlgorithms[compressionAlgorithmKey]; | ||
this.sendCompression = getCompressionHandler(this.defaultCompressionAlgorithm); | ||
} else { | ||
logging.log(LogVerbosity.ERROR, 'Invalid value provided for grpc.default_compression_algorithm option'); | ||
b0b3rt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
murgatroid99 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
async sendMetadata(metadata: Promise<Metadata>): Promise<Metadata> { | ||
const headers: Metadata = await metadata; | ||
headers.set('grpc-accept-encoding', 'identity,deflate,gzip'); | ||
headers.set('accept-encoding', 'identity'); | ||
|
||
if (this.defaultCompressionAlgorithm && ['deflate', 'gzip'].includes(this.defaultCompressionAlgorithm)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like the explicit check for "deflate" and "gzip" here. It's effectively duplicating information in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, good call, there's no reason to have this check anymore. I think explicitly excluding |
||
headers.set('grpc-encoding', this.defaultCompressionAlgorithm); | ||
} else { | ||
headers.remove('grpc-encoding'); | ||
} | ||
|
||
return headers; | ||
} | ||
|
||
|
@@ -183,6 +216,18 @@ export class CompressionFilter extends BaseFilter implements Filter { | |
} | ||
} | ||
metadata.remove('grpc-encoding'); | ||
|
||
/* Check to see if the compression we're using to send messages is supported by the server | ||
* If not, reset the sendCompression filter and have it use the default IdentityHandler */ | ||
const serverSupportedEncodingsHeader = metadata.get('grpc-accept-encoding')[0] as string | undefined; | ||
if (serverSupportedEncodingsHeader) { | ||
const serverSupportedEncodings = serverSupportedEncodingsHeader.split(','); | ||
|
||
if ((this.sendCompression instanceof DeflateHandler && !serverSupportedEncodings.includes('deflate')) | ||
|| (this.sendCompression instanceof GzipHandler && !serverSupportedEncodings.includes('gzip'))) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this whole There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, simplified (and added). |
||
this.sendCompression = new IdentityHandler(); | ||
} | ||
} | ||
metadata.remove('grpc-accept-encoding'); | ||
return metadata; | ||
} | ||
|
@@ -194,7 +239,7 @@ export class CompressionFilter extends BaseFilter implements Filter { | |
const resolvedMessage: WriteObject = await message; | ||
const compress = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this assignment needs to be expanded out at this point. It's three conditionals and too many negatives, and I think the ordering is a little confusing. I think it would be better for the handler check to be the primary thing, and separate from the flags check. So, something like this: if (this.sendCompression instanceof IdentityHandler) {
compress = false;
} else {
compress = ((resolvedMessage.flags ?? 0) & WriteFlags.NoCompress) === 0;
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it's a bit too much for a ternary expression. Split it out as suggested. |
||
resolvedMessage.flags === undefined | ||
? false | ||
? !(this.sendCompression instanceof IdentityHandler) | ||
: (resolvedMessage.flags & WriteFlags.NoCompress) === 0; | ||
return { | ||
message: await this.sendCompression.writeMessage( | ||
|
@@ -216,8 +261,8 @@ export class CompressionFilter extends BaseFilter implements Filter { | |
|
||
export class CompressionFilterFactory | ||
implements FilterFactory<CompressionFilter> { | ||
constructor(private readonly channel: Channel) {} | ||
constructor(private readonly channel: Channel, private readonly options: ChannelOptions) {} | ||
createFilter(callStream: Call): CompressionFilter { | ||
return new CompressionFilter(); | ||
return new CompressionFilter(this.options); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
import { EventEmitter } from 'events'; | ||
import * as http2 from 'http2'; | ||
import { Duplex, Readable, Writable } from 'stream'; | ||
import * as zlib from 'zlib'; | ||
|
||
import { Deadline, StatusObject } from './call-stream'; | ||
import { | ||
|
@@ -60,7 +61,7 @@ const deadlineUnitsToMs: DeadlineUnitIndexSignature = { | |
const defaultResponseHeaders = { | ||
// TODO(cjihrig): Remove these encoding headers from the default response | ||
// once compression is integrated. | ||
[GRPC_ACCEPT_ENCODING_HEADER]: 'identity', | ||
[GRPC_ACCEPT_ENCODING_HEADER]: 'identity,deflate,gzip', | ||
[GRPC_ENCODING_HEADER]: 'identity', | ||
[http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_OK, | ||
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: 'application/grpc+proto', | ||
|
@@ -136,12 +137,13 @@ export class ServerReadableStreamImpl<RequestType, ResponseType> | |
constructor( | ||
private call: Http2ServerCallStream<RequestType, ResponseType>, | ||
public metadata: Metadata, | ||
public deserialize: Deserialize<RequestType> | ||
public deserialize: Deserialize<RequestType>, | ||
encoding?: string | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As in the client-side filter code, throughout this file, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, went ahead and made that consistent everywhere. |
||
) { | ||
super({ objectMode: true }); | ||
this.cancelled = false; | ||
this.call.setupSurfaceCall(this); | ||
this.call.setupReadable(this); | ||
this.call.setupReadable(this, encoding); | ||
} | ||
|
||
_read(size: number) { | ||
|
@@ -250,13 +252,14 @@ export class ServerDuplexStreamImpl<RequestType, ResponseType> | |
private call: Http2ServerCallStream<RequestType, ResponseType>, | ||
public metadata: Metadata, | ||
public serialize: Serialize<ResponseType>, | ||
public deserialize: Deserialize<RequestType> | ||
public deserialize: Deserialize<RequestType>, | ||
encoding?: string | ||
) { | ||
super({ objectMode: true }); | ||
this.cancelled = false; | ||
this.trailingMetadata = new Metadata(); | ||
this.call.setupSurfaceCall(this); | ||
this.call.setupReadable(this); | ||
this.call.setupReadable(this, encoding); | ||
|
||
this.on('error', (err) => { | ||
this.call.sendError(err); | ||
|
@@ -439,6 +442,47 @@ export class Http2ServerCallStream< | |
return this.cancelled; | ||
} | ||
|
||
private getDecompressedMessage(message: Buffer, encoding?: string) { | ||
switch (encoding) { | ||
case 'deflate': { | ||
return new Promise<Buffer | undefined>((resolve, reject) => { | ||
zlib.inflate(message.slice(5), (err, output) => { | ||
if (err) { | ||
this.sendError({ | ||
code: Status.INTERNAL, | ||
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`, | ||
}); | ||
resolve(); | ||
} else { | ||
const joined = Buffer.concat([message.slice(0, 5), output]); | ||
resolve(joined); | ||
} | ||
}); | ||
}); | ||
} | ||
|
||
case 'gzip': { | ||
return new Promise<Buffer | undefined>((resolve, reject) => { | ||
zlib.unzip(message.slice(5), (err, output) => { | ||
if (err) { | ||
this.sendError({ | ||
code: Status.INTERNAL, | ||
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`, | ||
}); | ||
resolve(); | ||
} else { | ||
const joined = Buffer.concat([message.slice(0, 5), output]); | ||
resolve(joined); | ||
} | ||
}); | ||
}); | ||
} | ||
|
||
default: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There should be a separate "identity" case here that does not modify the message, and then the default case will be for unrecognized compression algorithms, and that should end the call with an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
return Promise.resolve(message); | ||
} | ||
} | ||
|
||
sendMetadata(customMetadata?: Metadata) { | ||
if (this.checkCancelled()) { | ||
return; | ||
|
@@ -469,7 +513,7 @@ export class Http2ServerCallStream< | |
const err = new Error('Invalid deadline') as ServerErrorResponse; | ||
err.code = Status.OUT_OF_RANGE; | ||
this.sendError(err); | ||
return; | ||
return metadata; | ||
} | ||
|
||
const timeout = (+match[1] * deadlineUnitsToMs[match[2]]) | 0; | ||
|
@@ -484,13 +528,12 @@ export class Http2ServerCallStream< | |
metadata.remove(http2.constants.HTTP2_HEADER_ACCEPT_ENCODING); | ||
metadata.remove(http2.constants.HTTP2_HEADER_TE); | ||
metadata.remove(http2.constants.HTTP2_HEADER_CONTENT_TYPE); | ||
metadata.remove('grpc-encoding'); | ||
metadata.remove('grpc-accept-encoding'); | ||
|
||
return metadata; | ||
} | ||
|
||
receiveUnaryMessage(): Promise<RequestType> { | ||
receiveUnaryMessage(encoding?: string): Promise<RequestType> { | ||
return new Promise((resolve, reject) => { | ||
const stream = this.stream; | ||
const chunks: Buffer[] = []; | ||
|
@@ -516,7 +559,19 @@ export class Http2ServerCallStream< | |
} | ||
|
||
this.emit('receiveMessage'); | ||
resolve(this.deserializeMessage(requestBytes)); | ||
|
||
const compressed = requestBytes.readUInt8(0) === 1; | ||
const compressedMessageEncoding = compressed ? encoding : undefined; | ||
const decompressedMessage = await this.getDecompressedMessage(requestBytes, compressedMessageEncoding); | ||
|
||
// Encountered an error with decompression; it'll already have been propogated back | ||
// Just return early | ||
if (!decompressedMessage) { | ||
resolve(); | ||
} | ||
else { | ||
resolve(this.deserializeMessage(decompressedMessage)); | ||
} | ||
} catch (err) { | ||
err.code = Status.INTERNAL; | ||
this.sendError(err); | ||
|
@@ -673,7 +728,8 @@ export class Http2ServerCallStream< | |
setupReadable( | ||
readable: | ||
| ServerReadableStream<RequestType, ResponseType> | ||
| ServerDuplexStream<RequestType, ResponseType> | ||
| ServerDuplexStream<RequestType, ResponseType>, | ||
encoding?: string | ||
) { | ||
const decoder = new StreamDecoder(); | ||
|
||
|
@@ -692,7 +748,16 @@ export class Http2ServerCallStream< | |
return; | ||
} | ||
this.emit('receiveMessage'); | ||
this.pushOrBufferMessage(readable, message); | ||
|
||
const compressed = message.readUInt8(0) === 1; | ||
const compressedMessageEncoding = compressed ? encoding : undefined; | ||
const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding); | ||
|
||
// Encountered an error with decompression; it'll already have been propogated back | ||
// Just return early | ||
if (!decompressedMessage) return; | ||
|
||
this.pushOrBufferMessage(readable, decompressedMessage); | ||
} | ||
}); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the benefit of using
const
here instead ofenum
? I think an enum would be better for helping the user populate the channel option, because they could useCompressionAlgorithm.gzip
, for example.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally I disprefer enums in Typescript because they're one of the few language features which aren't runtime-only, and they interact somewhat poorly with the rest of the type system, since they're somewhat poorly supported and the language designers seem to pushing people in the direction of using object literals (with
as const
) instead.Those probably aren't huge concerns in this case; I don't see a way that making it an enum would frustrate a typical user of the library. For user convenience and consistency with the way other such options are handled, I'll change it to an enum.