Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use exact size, if known, to allocate decompression buffer #3048

Merged
merged 5 commits into from Oct 4, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions encoding/encoding.go
Expand Up @@ -48,6 +48,14 @@ type Compressor interface {
Name() string
}

// CompressorSizer is optional, can be implemented to improve efficiency.
// This API is EXPERIMENTAL.
type CompressorSizer interface {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
// DecompressedSize returns the exact size the message will
// uncompress into, if known.
DecompressedSize(buf []byte, maxSize int) (int, error)
}

var registeredCompressor = make(map[string]Compressor)

// RegisterCompressor registers the compressor with gRPC by its name. It can
Expand Down
16 changes: 16 additions & 0 deletions encoding/gzip/gzip.go
Expand Up @@ -23,9 +23,11 @@ package gzip

import (
"compress/gzip"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"math"
"sync"

"google.golang.org/grpc/encoding"
Expand Down Expand Up @@ -107,6 +109,20 @@ func (z *reader) Read(p []byte) (n int, err error) {
return n, err
}

// RFC1952 specifies that the last four bytes "contains the size of
// the original (uncompressed) input data modulo 2^32."
func (c *compressor) DecompressedSize(buf []byte, maxSize int) (int, error) {
if int64(maxSize) > int64(math.MaxUint32) {
dfawley marked this conversation as resolved.
Show resolved Hide resolved
return 0, fmt.Errorf("grpc: message size not known when messages can be longer than 4GB")
}
last := len(buf)
if last < 4 {
return 0, fmt.Errorf("grpc: invalid gzip buffer")
dfawley marked this conversation as resolved.
Show resolved Hide resolved
}
size := binary.LittleEndian.Uint32(buf[last-4 : last])
return int(size), nil
}

func (c *compressor) Name() string {
return Name
}
Expand Down
55 changes: 40 additions & 15 deletions rpc_util.go
Expand Up @@ -648,35 +648,60 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
return nil, st.Err()
}

var size int
if pf == compressionMade {
// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
// use this decompressor as the default.
if dc != nil {
d, err = dc.Do(bytes.NewReader(d))
if err != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
size = len(d)
} else {
dcReader, err := compressor.Decompress(bytes.NewReader(d))
if err != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
// Read from LimitReader with limit max+1. So if the underlying
// reader is over limit, the result will be bigger than max.
d, err = ioutil.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
if err != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
d, size, err = decompress(compressor, d, maxReceiveMessageSize)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
} else {
size = len(d)
}
if len(d) > maxReceiveMessageSize {
if size > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize)
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", size, maxReceiveMessageSize)
}
return d, nil
}

// Using compressor, decompress d, returning data and size.
// Optionally, if data will be over maxReceiveMessageSize, just return the size.
func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize int) ([]byte, int, error) {
dcReader, err := compressor.Decompress(bytes.NewReader(d))
if err != nil {
return nil, 0, err
}
if sizer, ok := compressor.(encoding.CompressorSizer); ok {
if size, err := sizer.DecompressedSize(d, maxReceiveMessageSize); err == nil {
if size > maxReceiveMessageSize {
return nil, size, nil
}
var buf bytes.Buffer
buf.Grow(size + bytes.MinRead) // extra space guarantees no reallocation
dfawley marked this conversation as resolved.
Show resolved Hide resolved
bytesRead, err := buf.ReadFrom(dcReader)
if err != nil {
return nil, size, err
}
if bytesRead != int64(size) {
return nil, size, fmt.Errorf("read different size than expected (%d vs. %d)", bytesRead, size)
}
return buf.Bytes(), size, nil
}
}
// Read from LimitReader with limit max+1. So if the underlying
// reader is over limit, the result will be bigger than max.
d, err = ioutil.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
return d, len(d), err
}

// For the two compressor parameters, both should not be set, but if they are,
// dc takes precedence over compressor.
// TODO(dfawley): wrap the old compressor/decompressor using the new API?
Expand Down