From e89073323817ae08f193f8f8916df7582a38fb0f Mon Sep 17 00:00:00 2001 From: Ronak Jain Date: Mon, 21 Nov 2022 22:59:08 +0530 Subject: [PATCH] address comments --- internal/transport/transport.go | 8 ++++---- server.go | 31 ++++++++++++++++++++++--------- stream.go | 2 ++ 3 files changed, 28 insertions(+), 13 deletions(-) diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 4941f3d14b67..ef642392061b 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -348,12 +348,12 @@ func (s *Stream) RecvCompress() string { } // SetSendCompress sets the compression algorithm to the stream. -func (s *Stream) SetSendCompress(str string) error { +func (s *Stream) SetSendCompress(name string) error { if s.isHeaderSent() || s.getState() == streamDone { return status.Error(codes.Internal, "transport: set send compressor called after headers sent or stream done") } - s.sendCompress = str + s.sendCompress = name return nil } @@ -362,8 +362,8 @@ func (s *Stream) SendCompress() string { return s.sendCompress } -// ClientAdvertisedCompressors returns the advertised compressor names by the -// client. +// ClientAdvertisedCompressors returns the compressor names advertised by the +// client via :grpc-accept-encoding header. func (s *Stream) ClientAdvertisedCompressors() string { return s.clientAdvertisedCompressors } diff --git a/server.go b/server.go index 13b346876ab9..69cdec19a1da 100644 --- a/server.go +++ b/server.go @@ -1289,17 +1289,20 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. if s.opts.cp != nil { cp = s.opts.cp - _ = stream.SetSendCompress(cp.Type()) sendCompressorName = cp.Type() } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { // Legacy compressor not specified; attempt to respond with same encoding. comp = encoding.GetCompressor(rc) if comp != nil { - _ = stream.SetSendCompress(rc) sendCompressorName = comp.Name() } } + if sendCompressorName != "" { + // Safe to ignore returned error value as we are guaranteed to succeed here + _ = stream.SetSendCompress(sendCompressorName) + } + var payInfo *payloadInfo if len(shs) != 0 || len(binlogs) != 0 { payInfo = &payloadInfo{} @@ -1383,6 +1386,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } opts := &transport.Options{Last: true} + // Server handler could have set new compressor by calling SetSendCompressor. + // In case it is set, we need to use it for compressing outbound message. if stream.SendCompress() != sendCompressorName { comp = encoding.GetCompressor(stream.SendCompress()) } @@ -1613,17 +1618,20 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. if s.opts.cp != nil { ss.cp = s.opts.cp - _ = stream.SetSendCompress(s.opts.cp.Type()) ss.sendCompressorName = s.opts.cp.Type() } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { // Legacy compressor not specified; attempt to respond with same encoding. ss.comp = encoding.GetCompressor(rc) if ss.comp != nil { - _ = stream.SetSendCompress(rc) ss.sendCompressorName = rc } } + if ss.sendCompressorName != "" { + // Safe to ignore returned error value as we are guaranteed to succeed here + _ = stream.SetSendCompress(ss.sendCompressorName) + } + ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp) if trInfo != nil { @@ -1953,20 +1961,25 @@ func SendHeader(ctx context.Context, md metadata.MD) error { return nil } -// SetSendCompressor sets the compressor that will be used when sending -// RPC payload back to the client. It may be called at most once, and must not -// be called after any event that causes headers to be sent (see SetHeader for -// a complete list). Provided compressor is used when below conditions are met: +// SetSendCompressor sets a compressor for outbound messages. +// It may be called at most once, and must not be called after any event that +// causes headers to be sent (see SetHeader for a complete list). Provided +// compressor is used when below conditions are met: // // - compressor is registered via encoding.RegisterCompressor // - compressor name exists in the client advertised compressor names sent in -// grpc-accept-encoding metadata. +// :grpc-accept-encoding header. // // The context provided must be the context passed to the server's handler. // // The error returned is compatible with the status package. However, the // status code will often not match the RPC status as seen by the client // application, and therefore, should not be relied upon for this purpose. +// +// # Experimental +// +// Notice: This type is EXPERIMENTAL and may be changed or removed in a +// later release. func SetSendCompressor(ctx context.Context, name string) error { stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream) if !ok || stream == nil { diff --git a/stream.go b/stream.go index 87ce53cbc349..6685a3ec3a62 100644 --- a/stream.go +++ b/stream.go @@ -1583,6 +1583,8 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { } }() + // Server handler could have set new compressor by calling SetSendCompressor. + // In case it is set, we need to use it for compressing outbound message. if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName { ss.comp = encoding.GetCompressor(sendCompressorsName) ss.sendCompressorName = sendCompressorsName