From 7b817b4d182b8f17e9765f51812cd4f168aca09f Mon Sep 17 00:00:00 2001 From: Ronak Jain Date: Wed, 12 Oct 2022 05:07:02 +0530 Subject: [PATCH] client: set grpc-accept-encoding to full list of registered compressors (#5541) --- encoding/encoding.go | 3 ++ internal/envconfig/envconfig.go | 8 ++- internal/grpcutil/compressor.go | 47 +++++++++++++++++ internal/grpcutil/compressor_test.go | 46 +++++++++++++++++ internal/transport/http2_client.go | 17 +++++- test/end2end_test.go | 77 ++++++++++++++++++++++++++++ 6 files changed, 195 insertions(+), 3 deletions(-) create mode 100644 internal/grpcutil/compressor.go create mode 100644 internal/grpcutil/compressor_test.go diff --git a/encoding/encoding.go b/encoding/encoding.go index 18e530fc902..9151eba26ac 100644 --- a/encoding/encoding.go +++ b/encoding/encoding.go @@ -28,6 +28,8 @@ package encoding import ( "io" "strings" + + "google.golang.org/grpc/internal/grpcutil" ) // Identity specifies the optional encoding for uncompressed streams. @@ -73,6 +75,7 @@ var registeredCompressor = make(map[string]Compressor) // registered with the same name, the one registered last will take effect. func RegisterCompressor(c Compressor) { registeredCompressor[c.Name()] = c + grpcutil.RegisteredCompressorNames = append(grpcutil.RegisteredCompressorNames, c.Name()) } // GetCompressor returns Compressor for the given compressor name. diff --git a/internal/envconfig/envconfig.go b/internal/envconfig/envconfig.go index 6f027254311..7edd196bd3d 100644 --- a/internal/envconfig/envconfig.go +++ b/internal/envconfig/envconfig.go @@ -25,11 +25,15 @@ import ( ) const ( - prefix = "GRPC_GO_" - txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS" + prefix = "GRPC_GO_" + txtErrIgnoreStr = prefix + "IGNORE_TXT_ERRORS" + advertiseCompressorsStr = prefix + "ADVERTISE_COMPRESSORS" ) var ( // TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false"). TXTErrIgnore = !strings.EqualFold(os.Getenv(txtErrIgnoreStr), "false") + // AdvertiseCompressors is set if registered compressor should be advertised + // ("GRPC_GO_ADVERTISE_COMPRESSORS" is not "false"). + AdvertiseCompressors = !strings.EqualFold(os.Getenv(advertiseCompressorsStr), "false") ) diff --git a/internal/grpcutil/compressor.go b/internal/grpcutil/compressor.go new file mode 100644 index 00000000000..9f409096798 --- /dev/null +++ b/internal/grpcutil/compressor.go @@ -0,0 +1,47 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpcutil + +import ( + "strings" + + "google.golang.org/grpc/internal/envconfig" +) + +// RegisteredCompressorNames holds names of the registered compressors. +var RegisteredCompressorNames []string + +// IsCompressorNameRegistered returns true when name is available in registry. +func IsCompressorNameRegistered(name string) bool { + for _, compressor := range RegisteredCompressorNames { + if compressor == name { + return true + } + } + return false +} + +// RegisteredCompressors returns a string of registered compressor names +// separated by comma. +func RegisteredCompressors() string { + if !envconfig.AdvertiseCompressors { + return "" + } + return strings.Join(RegisteredCompressorNames, ",") +} diff --git a/internal/grpcutil/compressor_test.go b/internal/grpcutil/compressor_test.go new file mode 100644 index 00000000000..0d639422a9a --- /dev/null +++ b/internal/grpcutil/compressor_test.go @@ -0,0 +1,46 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpcutil + +import ( + "testing" + + "google.golang.org/grpc/internal/envconfig" +) + +func TestRegisteredCompressors(t *testing.T) { + defer func(c []string) { RegisteredCompressorNames = c }(RegisteredCompressorNames) + defer func(v bool) { envconfig.AdvertiseCompressors = v }(envconfig.AdvertiseCompressors) + RegisteredCompressorNames = []string{"gzip", "snappy"} + tests := []struct { + desc string + enabled bool + want string + }{ + {desc: "compressor_ad_disabled", enabled: false, want: ""}, + {desc: "compressor_ad_enabled", enabled: true, want: "gzip,snappy"}, + } + for _, tt := range tests { + envconfig.AdvertiseCompressors = tt.enabled + compressors := RegisteredCompressors() + if compressors != tt.want { + t.Fatalf("Unexpected compressors got:%s, want:%s", compressors, tt.want) + } + } +} diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 1d8c9be4fa4..5251e28d734 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -110,6 +110,7 @@ type http2Client struct { streamsQuotaAvailable chan struct{} waitingStreams uint32 nextID uint32 + registeredCompressors string // Do not access controlBuf with mu held. mu sync.Mutex // guard the following variables @@ -300,6 +301,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ctxDone: ctx.Done(), // Cache Done chan. cancel: cancel, userAgent: opts.UserAgent, + registeredCompressors: grpcutil.RegisteredCompressors(), conn: conn, remoteAddr: conn.RemoteAddr(), localAddr: conn.LocalAddr(), @@ -508,9 +510,22 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)}) } + registeredCompressors := t.registeredCompressors if callHdr.SendCompress != "" { headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) - headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress}) + // Include the outgoing compressor name when compressor is not registered + // via encoding.RegisterCompressor. This is possible when client uses + // WithCompressor dial option. + if !grpcutil.IsCompressorNameRegistered(callHdr.SendCompress) { + if registeredCompressors != "" { + registeredCompressors += "," + } + registeredCompressors += callHdr.SendCompress + } + } + + if registeredCompressors != "" { + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: registeredCompressors}) } if dl, ok := ctx.Deadline(); ok { // Send out timeout regardless its value. The server can detect timeout context by itself. diff --git a/test/end2end_test.go b/test/end2end_test.go index ecf5b5e303e..165cf19b987 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3250,6 +3250,7 @@ func testMetadataUnaryRPC(t *testing.T, e env) { delete(header, "date") // the Date header is also optional delete(header, "user-agent") delete(header, "content-type") + delete(header, "grpc-accept-encoding") } if !reflect.DeepEqual(header, testMetadata) { t.Fatalf("Received header metadata %v, want %v", header, testMetadata) @@ -3289,6 +3290,7 @@ func testMetadataOrderUnaryRPC(t *testing.T, e env) { delete(header, "date") // the Date header is also optional delete(header, "user-agent") delete(header, "content-type") + delete(header, "grpc-accept-encoding") } if !reflect.DeepEqual(header, newMetadata) { @@ -3401,6 +3403,8 @@ func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) { } delete(header, "user-agent") delete(header, "content-type") + delete(header, "grpc-accept-encoding") + expectedHeader := metadata.Join(testMetadata, testMetadata2) if !reflect.DeepEqual(header, expectedHeader) { t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) @@ -3445,6 +3449,7 @@ func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) { } delete(header, "user-agent") delete(header, "content-type") + delete(header, "grpc-accept-encoding") expectedHeader := metadata.Join(testMetadata, testMetadata2) if !reflect.DeepEqual(header, expectedHeader) { t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) @@ -3488,6 +3493,7 @@ func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) { } delete(header, "user-agent") delete(header, "content-type") + delete(header, "grpc-accept-encoding") expectedHeader := metadata.Join(testMetadata, testMetadata2) if !reflect.DeepEqual(header, expectedHeader) { t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) @@ -3528,6 +3534,7 @@ func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) { } delete(header, "user-agent") delete(header, "content-type") + delete(header, "grpc-accept-encoding") expectedHeader := metadata.Join(testMetadata, testMetadata2) if !reflect.DeepEqual(header, expectedHeader) { t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) @@ -3591,6 +3598,7 @@ func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) { } delete(header, "user-agent") delete(header, "content-type") + delete(header, "grpc-accept-encoding") expectedHeader := metadata.Join(testMetadata, testMetadata2) if !reflect.DeepEqual(header, expectedHeader) { t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) @@ -3651,6 +3659,7 @@ func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) { } delete(header, "user-agent") delete(header, "content-type") + delete(header, "grpc-accept-encoding") expectedHeader := metadata.Join(testMetadata, testMetadata2) if !reflect.DeepEqual(header, expectedHeader) { t.Fatalf("Received header metadata %v, want %v", header, expectedHeader) @@ -3982,6 +3991,7 @@ func testMetadataStreamingRPC(t *testing.T, e env) { delete(headerMD, "trailer") // ignore if present delete(headerMD, "user-agent") delete(headerMD, "content-type") + delete(headerMD, "grpc-accept-encoding") if err != nil || !reflect.DeepEqual(testMetadata, headerMD) { t.Errorf("#1 %v.Header() = %v, %v, want %v, ", stream, headerMD, err, testMetadata) } @@ -3990,6 +4000,7 @@ func testMetadataStreamingRPC(t *testing.T, e env) { delete(headerMD, "trailer") // ignore if present delete(headerMD, "user-agent") delete(headerMD, "content-type") + delete(headerMD, "grpc-accept-encoding") if err != nil || !reflect.DeepEqual(testMetadata, headerMD) { t.Errorf("#2 %v.Header() = %v, %v, want %v, ", stream, headerMD, err, testMetadata) } @@ -5432,6 +5443,72 @@ func (s) TestForceServerCodec(t *testing.T) { } } +// renameCompressor is a grpc.Compressor wrapper that allows customizing the +// Type() of another compressor. +type renameCompressor struct { + grpc.Compressor + name string +} + +func (r *renameCompressor) Type() string { return r.name } + +// renameDecompressor is a grpc.Decompressor wrapper that allows customizing the +// Type() of another Decompressor. +type renameDecompressor struct { + grpc.Decompressor + name string +} + +func (r *renameDecompressor) Type() string { return r.name } + +func (s) TestClientForwardsGrpcAcceptEncodingHeader(t *testing.T) { + wantGrpcAcceptEncodingCh := make(chan []string, 1) + defer close(wantGrpcAcceptEncodingCh) + + compressor := renameCompressor{Compressor: grpc.NewGZIPCompressor(), name: "testgzip"} + decompressor := renameDecompressor{Decompressor: grpc.NewGZIPDecompressor(), name: "testgzip"} + + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, status.Errorf(codes.Internal, "no metadata in context") + } + if got, want := md["grpc-accept-encoding"], <-wantGrpcAcceptEncodingCh; !reflect.DeepEqual(got, want) { + return nil, status.Errorf(codes.Internal, "got grpc-accept-encoding=%q; want [%q]", got, want) + } + return &testpb.Empty{}, nil + }, + } + if err := ss.Start([]grpc.ServerOption{grpc.RPCDecompressor(&decompressor)}); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + wantGrpcAcceptEncodingCh <- []string{"gzip"} + if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err) + } + + wantGrpcAcceptEncodingCh <- []string{"gzip"} + if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}, grpc.UseCompressor("gzip")); err != nil { + t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err) + } + + // Use compressor directly which is not registered via + // encoding.RegisterCompressor. + if err := ss.StartClient(grpc.WithCompressor(&compressor)); err != nil { + t.Fatalf("Error starting client: %v", err) + } + wantGrpcAcceptEncodingCh <- []string{"gzip,testgzip"} + if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err) + } +} + func (s) TestUnaryProxyDoesNotForwardMetadata(t *testing.T) { const mdkey = "somedata"