From de0849b035d2b2bb0a42398d35e8c7230ae03f58 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 24 Jul 2019 11:20:12 -0400 Subject: [PATCH] server: populate WireLength on stats.InPayload for unary RPCs Fixes #2692 which was incompletely fixed by #2711. Also adds a basic unit test which provides scaffolding for further testing of the stats logic. --- server.go | 9 ++-- server_stats_test.go | 110 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 4 deletions(-) create mode 100644 server_stats_test.go diff --git a/server.go b/server.go index 04968fecb7f3..3d482dba5656 100644 --- a/server.go +++ b/server.go @@ -971,10 +971,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } if sh != nil { sh.HandleRPC(stream.Context(), &stats.InPayload{ - RecvTime: time.Now(), - Payload: v, - Data: d, - Length: len(d), + RecvTime: time.Now(), + Payload: v, + WireLength: payInfo.wireLength, + Data: d, + Length: len(d), }) } if binlog != nil { diff --git a/server_stats_test.go b/server_stats_test.go new file mode 100644 index 000000000000..093623e92f3a --- /dev/null +++ b/server_stats_test.go @@ -0,0 +1,110 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc_test + +import ( + "context" + "net" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/stats" +) + +// TestStatsHandlerRPCWireLength ensure that the WireLength fields of stats.RPCStats +// information carries sane values. +func TestStatsHandlerRPCWireLength(t *testing.T) { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("failed to create listener: %v", err) + } + sh := statsHandler{ + handleRPC: func(_ context.Context, rpcStats stats.RPCStats) { + switch s := rpcStats.(type) { + case *stats.InPayload: + if len(s.Data) > 0 && s.WireLength == 0 { + t.Errorf("WireLength cannot be zero for a non-zero InPayload: %+v", s) + } + case *stats.OutPayload: + if len(s.Data) > 0 && s.WireLength == 0 { + t.Errorf("WireLength cannot be zero for a non-zero OutPayload: %+v", s) + } + } + }, + } + server := grpc.NewServer( + grpc.StatsHandler(&sh), + ) + healthServer := health.NewServer() + grpc_health_v1.RegisterHealthServer(server, healthServer) + + defer server.Stop() + go server.Serve(lis) + c, err := grpc.Dial(lis.Addr().String(), grpc.WithStatsHandler(&sh), grpc.WithInsecure()) + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + defer c.Close() + if !c.WaitForStateChange(context.TODO(), connectivity.Ready) { + t.Fatal() + } + const service = "foo" + healthServer.SetServingStatus(service, grpc_health_v1.HealthCheckResponse_SERVING) + hc := grpc_health_v1.NewHealthClient(c) + req := &grpc_health_v1.HealthCheckRequest{ + Service: service, + } + if _, err := hc.Check(context.TODO(), req); err != nil { + t.Fatalf("failed to check health: %v", err) + } + w, err := hc.Watch(context.TODO(), req) + if err != nil { + t.Fatalf("failed to watch: %v", err) + } + if _, err := w.Recv(); err != nil { + t.Fatalf("failed to receive a health update: %v", err) + } +} + +// statsHandler implements stats.Handler and allows clients to mock the HandleRPC method. +type statsHandler struct { + handleRPC func(ctx context.Context, rpcStats stats.RPCStats) +} + +var _ stats.Handler = (*statsHandler)(nil) + +func (s *statsHandler) HandleConn(context.Context, stats.ConnStats) { +} + +func (s *statsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + return ctx +} + +func (s *statsHandler) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) { + if s.handleRPC != nil { + s.handleRPC(ctx, rpcStats) + } +} + +func (s *statsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { + return ctx +}