Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
Add started RPC metric for client and server side (#1283)
Browse files Browse the repository at this point in the history
* Add started RPC metric for client and server side
  • Loading branch information
zasweq committed Nov 3, 2022
1 parent 0bf7faa commit b1a01ee
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 4 deletions.
9 changes: 9 additions & 0 deletions plugin/ocgrpc/client_metrics.go
Expand Up @@ -28,6 +28,7 @@ var (
ClientReceivedMessagesPerRPC = stats.Int64("grpc.io/client/received_messages_per_rpc", "Number of response messages received per RPC (always 1 for non-streaming RPCs).", stats.UnitDimensionless)
ClientReceivedBytesPerRPC = stats.Int64("grpc.io/client/received_bytes_per_rpc", "Total bytes received across all response messages per RPC.", stats.UnitBytes)
ClientRoundtripLatency = stats.Float64("grpc.io/client/roundtrip_latency", "Time between first byte of request sent to last byte of response received, or terminal error.", stats.UnitMilliseconds)
ClientStartedRPCs = stats.Int64("grpc.io/client/started_rpcs", "Number of started client RPCs.", stats.UnitDimensionless)
ClientServerLatency = stats.Float64("grpc.io/client/server_latency", `Propagated from the server and should have the same value as "grpc.io/server/latency".`, stats.UnitMilliseconds)
)

Expand Down Expand Up @@ -70,6 +71,14 @@ var (
Aggregation: view.Count(),
}

ClientStartedRPCsView = &view.View{
Measure: ClientStartedRPCs,
Name: "grpc.io/client/started_rpcs",
Description: "Number of started client RPCs.",
TagKeys: []tag.Key{KeyClientMethod},
Aggregation: view.Count(),
}

ClientSentMessagesPerRPCView = &view.View{
Measure: ClientSentMessagesPerRPC,
Name: "grpc.io/client/sent_messages_per_rpc",
Expand Down
12 changes: 12 additions & 0 deletions plugin/ocgrpc/end_to_end_test.go
Expand Up @@ -40,6 +40,8 @@ func TestEndToEnd_Single(t *testing.T) {
ocgrpc.ClientReceivedMessagesPerRPCView,
ocgrpc.ServerSentMessagesPerRPCView,
ocgrpc.ClientSentMessagesPerRPCView,
ocgrpc.ServerStartedRPCsView,
ocgrpc.ClientStartedRPCsView,
}
view.Register(extraViews...)
defer view.Unregister(extraViews...)
Expand All @@ -63,10 +65,14 @@ func TestEndToEnd_Single(t *testing.T) {
if err != nil {
t.Fatal(err)
}
checkCount(t, ocgrpc.ClientStartedRPCsView, 1, clientMethodTag)
checkCount(t, ocgrpc.ServerStartedRPCsView, 1, serverMethodTag)
checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, clientStatusOKTag)
checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, serverStatusOKTag)

_, _ = client.Single(ctx, &testpb.FooRequest{Fail: true})
checkCount(t, ocgrpc.ClientStartedRPCsView, 2, clientMethodTag)
checkCount(t, ocgrpc.ServerStartedRPCsView, 2, serverMethodTag)
checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, serverStatusUnknownTag)
checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, clientStatusUnknownTag)

Expand Down Expand Up @@ -101,6 +107,7 @@ func TestEndToEnd_Single(t *testing.T) {
func TestEndToEnd_Stream(t *testing.T) {
view.Register(ocgrpc.DefaultClientViews...)
defer view.Unregister(ocgrpc.DefaultClientViews...)

view.Register(ocgrpc.DefaultServerViews...)
defer view.Unregister(ocgrpc.DefaultServerViews...)

Expand All @@ -109,6 +116,8 @@ func TestEndToEnd_Stream(t *testing.T) {
ocgrpc.ClientReceivedMessagesPerRPCView,
ocgrpc.ServerSentMessagesPerRPCView,
ocgrpc.ClientSentMessagesPerRPCView,
ocgrpc.ClientStartedRPCsView,
ocgrpc.ServerStartedRPCsView,
}
view.Register(extraViews...)
defer view.Unregister(extraViews...)
Expand Down Expand Up @@ -146,6 +155,8 @@ func TestEndToEnd_Stream(t *testing.T) {
t.Fatal(err)
}

checkCount(t, ocgrpc.ClientStartedRPCsView, 1, clientMethodTag)
checkCount(t, ocgrpc.ServerStartedRPCsView, 1, serverMethodTag)
checkCount(t, ocgrpc.ClientCompletedRPCsView, 1, clientMethodTag, clientStatusOKTag)
checkCount(t, ocgrpc.ServerCompletedRPCsView, 1, serverMethodTag, serverStatusOKTag)

Expand Down Expand Up @@ -183,6 +194,7 @@ func getCount(t *testing.T, v *view.View, tags ...tag.Tag) (int64, bool) {
return 0, false
}
}

rows, err := view.RetrieveData(v.Name)
if err != nil {
t.Fatal(err)
Expand Down
9 changes: 9 additions & 0 deletions plugin/ocgrpc/server_metrics.go
Expand Up @@ -27,6 +27,7 @@ var (
ServerReceivedBytesPerRPC = stats.Int64("grpc.io/server/received_bytes_per_rpc", "Total bytes received across all messages per RPC.", stats.UnitBytes)
ServerSentMessagesPerRPC = stats.Int64("grpc.io/server/sent_messages_per_rpc", "Number of messages sent in each RPC. Has value 1 for non-streaming RPCs.", stats.UnitDimensionless)
ServerSentBytesPerRPC = stats.Int64("grpc.io/server/sent_bytes_per_rpc", "Total bytes sent in across all response messages per RPC.", stats.UnitBytes)
ServerStartedRPCs = stats.Int64("grpc.io/server/started_rpcs", "Number of started server RPCs.", stats.UnitDimensionless)
ServerLatency = stats.Float64("grpc.io/server/server_latency", "Time between first byte of request received to last byte of response sent, or terminal error.", stats.UnitMilliseconds)
)

Expand Down Expand Up @@ -73,6 +74,14 @@ var (
Aggregation: view.Count(),
}

ServerStartedRPCsView = &view.View{
Measure: ServerStartedRPCs,
Name: "grpc.io/server/started_rpcs",
Description: "Number of started server RPCs.",
TagKeys: []tag.Key{KeyServerMethod},
Aggregation: view.Count(),
}

ServerReceivedMessagesPerRPCView = &view.View{
Name: "grpc.io/server/received_messages_per_rpc",
Description: "Distribution of messages received count per RPC, by method.",
Expand Down
23 changes: 22 additions & 1 deletion plugin/ocgrpc/stats_common.go
Expand Up @@ -82,8 +82,10 @@ func methodName(fullname string) string {
// statsHandleRPC processes the RPC events.
func statsHandleRPC(ctx context.Context, s stats.RPCStats) {
switch st := s.(type) {
case *stats.Begin, *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
case *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
// do nothing for client
case *stats.Begin:
handleRPCBegin(ctx, st)
case *stats.OutPayload:
handleRPCOutPayload(ctx, st)
case *stats.InPayload:
Expand All @@ -95,6 +97,25 @@ func statsHandleRPC(ctx context.Context, s stats.RPCStats) {
}
}

func handleRPCBegin(ctx context.Context, s *stats.Begin) {
d, ok := ctx.Value(rpcDataKey).(*rpcData)
if !ok {
if grpclog.V(2) {
grpclog.Infoln("Failed to retrieve *rpcData from context.")
}
}

if s.IsClient() {
ocstats.RecordWithOptions(ctx,
ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))),
ocstats.WithMeasurements(ClientStartedRPCs.M(1)))
} else {
ocstats.RecordWithOptions(ctx,
ocstats.WithTags(tag.Upsert(KeyClientMethod, methodName(d.method))),
ocstats.WithMeasurements(ServerStartedRPCs.M(1)))
}
}

func handleRPCOutPayload(ctx context.Context, s *stats.OutPayload) {
d, ok := ctx.Value(rpcDataKey).(*rpcData)
if !ok {
Expand Down
6 changes: 3 additions & 3 deletions stats/view/worker_test.go
Expand Up @@ -180,14 +180,14 @@ func Test_Worker_MultiExport(t *testing.T) {

// Format is Resource.Labels encoded as string, then
wantPartialData := map[string][]*Row{
makeKey(nil, count.Name): []*Row{
makeKey(nil, count.Name): {
{[]tag.Tag{{Key: key, Value: "a"}}, &CountData{Value: 2}},
{[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
},
makeKey(nil, sum.Name): []*Row{
makeKey(nil, sum.Name): {
{nil, &SumData{Value: 7.5}},
},
makeKey(&extraResource, count.Name): []*Row{
makeKey(&extraResource, count.Name): {
{[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
},
}
Expand Down

0 comments on commit b1a01ee

Please sign in to comment.