Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jronak committed Nov 21, 2022
1 parent 1b30a5a commit e890733
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
8 changes: 4 additions & 4 deletions internal/transport/transport.go
Expand Up @@ -348,12 +348,12 @@ func (s *Stream) RecvCompress() string {
}

// SetSendCompress sets the compression algorithm to the stream.
func (s *Stream) SetSendCompress(str string) error {
func (s *Stream) SetSendCompress(name string) error {
if s.isHeaderSent() || s.getState() == streamDone {
return status.Error(codes.Internal, "transport: set send compressor called after headers sent or stream done")
}

s.sendCompress = str
s.sendCompress = name
return nil
}

Expand All @@ -362,8 +362,8 @@ func (s *Stream) SendCompress() string {
return s.sendCompress
}

// ClientAdvertisedCompressors returns the advertised compressor names by the
// client.
// ClientAdvertisedCompressors returns the compressor names advertised by the
// client via :grpc-accept-encoding header.
func (s *Stream) ClientAdvertisedCompressors() string {
return s.clientAdvertisedCompressors
}
Expand Down
31 changes: 22 additions & 9 deletions server.go
Expand Up @@ -1289,17 +1289,20 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil {
cp = s.opts.cp
_ = stream.SetSendCompress(cp.Type())
sendCompressorName = cp.Type()
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding.
comp = encoding.GetCompressor(rc)
if comp != nil {
_ = stream.SetSendCompress(rc)
sendCompressorName = comp.Name()
}
}

if sendCompressorName != "" {
// Safe to ignore returned error value as we are guaranteed to succeed here
_ = stream.SetSendCompress(sendCompressorName)
}

var payInfo *payloadInfo
if len(shs) != 0 || len(binlogs) != 0 {
payInfo = &payloadInfo{}
Expand Down Expand Up @@ -1383,6 +1386,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
opts := &transport.Options{Last: true}

// Server handler could have set new compressor by calling SetSendCompressor.
// In case it is set, we need to use it for compressing outbound message.
if stream.SendCompress() != sendCompressorName {
comp = encoding.GetCompressor(stream.SendCompress())
}
Expand Down Expand Up @@ -1613,17 +1618,20 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil {
ss.cp = s.opts.cp
_ = stream.SetSendCompress(s.opts.cp.Type())
ss.sendCompressorName = s.opts.cp.Type()
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding.
ss.comp = encoding.GetCompressor(rc)
if ss.comp != nil {
_ = stream.SetSendCompress(rc)
ss.sendCompressorName = rc
}
}

if ss.sendCompressorName != "" {
// Safe to ignore returned error value as we are guaranteed to succeed here
_ = stream.SetSendCompress(ss.sendCompressorName)
}

ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp)

if trInfo != nil {
Expand Down Expand Up @@ -1953,20 +1961,25 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
return nil
}

// SetSendCompressor sets the compressor that will be used when sending
// RPC payload back to the client. It may be called at most once, and must not
// be called after any event that causes headers to be sent (see SetHeader for
// a complete list). Provided compressor is used when below conditions are met:
// SetSendCompressor sets a compressor for outbound messages.
// It may be called at most once, and must not be called after any event that
// causes headers to be sent (see SetHeader for a complete list). Provided
// compressor is used when below conditions are met:
//
// - compressor is registered via encoding.RegisterCompressor
// - compressor name exists in the client advertised compressor names sent in
// grpc-accept-encoding metadata.
// :grpc-accept-encoding header.
//
// The context provided must be the context passed to the server's handler.
//
// The error returned is compatible with the status package. However, the
// status code will often not match the RPC status as seen by the client
// application, and therefore, should not be relied upon for this purpose.
//
// # Experimental
//
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
func SetSendCompressor(ctx context.Context, name string) error {
stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
if !ok || stream == nil {
Expand Down
2 changes: 2 additions & 0 deletions stream.go
Expand Up @@ -1583,6 +1583,8 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
}
}()

// Server handler could have set new compressor by calling SetSendCompressor.
// In case it is set, we need to use it for compressing outbound message.
if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
ss.comp = encoding.GetCompressor(sendCompressorsName)
ss.sendCompressorName = sendCompressorsName
Expand Down

0 comments on commit e890733

Please sign in to comment.